一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink 的Changelog Streams与table 的集成2个示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
更多详细内容参考文章:
21、Flink 的table API与DataStream API 集成(完整版)
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<!-- <scope>provided</scope> -->
</dependency>
</dependencies>
在内部,Flink的表运行时是一个changelog处理器。
StreamTableEnvironment提供了以下方法来暴露change data capture(CDC)功能:
fromChangelogStream(DataStream):将变更日志条目流(stream of changelog entries)解释为表。流记录类型必须为org.apache.flink.types.Row,因为其RowKind标志在运行时评估(evaluated )。默认情况下,不会传播事件时间和水印。该方法期望将包含所有类型更改的changelog(在org.apache.flink.types.RowKind中枚举)作为默认的ChangelogMode。
fromChangelogStream(DataStream, Schema):允许为DataStream定义类似于fromDataStream(DataStream ,schema )的schema 。否则,语义等于fromChangelogStream(DataStream)。
fromChangelogStream(DataStream, Schema, ChangelogMode):提供关于如何将stream 解释为changelog的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract行为。
toChangelogStream(Table):fromChangelogStream(DataStream)的反向操作。它生成一个包含org.apache.flink.types.Row实例的流,并在运行时为每个记录设置RowKind标志。该方法支持各种更新表。如果输入表包含单个rowtime 列(single rowtime column),则它将传播到流记录的时间戳中(stream record’s timestamp)。水印也将被传播。
toChangelogStream(Table, Schema):fromChangelogStream(DataStream,Schema)的反向操作。该方法可以丰富生成的列数据类型。如果需要,planner 可以插入隐式转换。可以将rowtime写出为元数据列。
toChangelogStream(Table, Schema, ChangelogMode):提供关于如何将表转换为变更日志流(convert a table to a changelog stream)的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract 行为。
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
由于fromChangelogStream的行为类似于fromDataStream。
此虚拟连接器还支持读取和写入流记录的rowtime 元数据。
虚拟表源实现SupportsSourceWatermark。
下面的代码展示了如何将fromChangelogStream用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
/**
* @author alanchan
*
*/
public class TestFromChangelogStreamDemo {
//the stream as a retract stream
//默认ChangelogMode应该足以满足大多数用例,因为它接受所有类型的更改。
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、创建数据源
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "alan", 12),
Row.ofKind(RowKind.INSERT, "alanchan", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "alan", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "alan", 100));
// 3、changlogstream转为table
Table table = tenv.fromChangelogStream(dataStream);
// 4、创建视图
tenv.createTemporaryView("InputTable", table);
//5、聚合查询
tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | alanchan | 5 |
// | +I | alan | 12 |
// | -D | alan | 12 |
// | +I | alan | 100 |
// +----+--------------------------------+-------------+
// 4 rows in set
env.execute();
}
//the stream as an upsert stream (without a need for UPDATE_BEFORE)
//展示了如何通过使用upsert模式将更新消息的数量减少50%来限制传入更改的类型以提高效率。
//通过为toChangelogStream定义主键和upsert changelog模式,可以减少结果消息的数量。
public static void test2() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//2、创建数据源
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "alan", 12),
Row.ofKind(RowKind.INSERT, "alanchan", 5),
Row.ofKind(RowKind.UPDATE_AFTER, "alan", 100));
// 3、转为table
Table table =
tenv.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert());
// 4、创建视图
tenv.createTemporaryView("InputTable", table);
// 5、聚合查询
tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | alanchan | 5 |
// | +I | alan | 12 |
// | -U | alan | 12 |
// | +U | alan | 100 |
// +----+--------------------------------+-------------+
// 4 rows in set
env.execute();
}
public static void main(String[] args) throws Exception {
// test1();
test2();
}
}
下面的代码展示了如何将toChangelogStream用于不同的场景。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
/**
* @author alanchan
*
*/
public class TestToChangelogStreamDemo {
static final String SQL = "CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')";
//以最简单和最通用的方式转换为DataStream(无事件时间)
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、构建数据源并聚合查询
Table simpleTable = tenv
.fromValues(row("alan", 12), row("alan", 2), row("alanchan", 12))
.as("name", "score")
.groupBy($("name"))
.select($("name"), $("score").sum());
// 3、将table转成datastream,并输出
tenv
.toChangelogStream(simpleTable)
.executeAndCollect()
.forEachRemaining(System.out::println);
// +I[alanchan, 12]
// +I[alan, 12]
// -U[alan, 12]
// +U[alan, 14]
env.execute();
}
//以最简单和最通用的方式转换为DataStream(使用事件时间)
//由于`event_time`是schema的单个时间属性,因此它默认设置为流记录的时间戳;同时,它仍然是Row的一部分
public static void test2() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、建表并填入数据
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
DataStream<Row> dataStream = tenv.toChangelogStream(table);
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
System.out.println(row.getFieldNames(true));
// [name, score, event_time]
// timestamp exists twice
assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
}
});
env.execute();
}
//转换为DataStream,但将time属性写出为元数据列,这意味着它不再是physical schema的一部分
public static void test3() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、建表并填入数据
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
DataStream<Row> dataStream = tenv.toChangelogStream(
table,
Schema.newBuilder()
.column("name", "STRING")
.column("score", "INT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.build());
// the stream record's timestamp is defined by the metadata; it is not part of the Row
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
// prints: [name, score]
System.out.println(row.getFieldNames(true));
// timestamp exists once
System.out.println(ctx.timestamp());
}
});
env.execute();
}
//可以使用更多的内部数据结构以提高效率
//这里提到这只是为了完整性,因为使用内部数据结构增加了复杂性和额外的类型处理
//将TIMESTAMP_LTZ列转换为`Long`或将STRING转换为`byte[]`可能很方便,如果需要,结构化类型也可以表示为`Row`
public static void test4() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、建表并填入数据
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
DataStream<Row> dataStream = tenv.toChangelogStream(
table,
Schema.newBuilder()
.column( "name", DataTypes.STRING().bridgedTo(StringData.class))
.column( "score", DataTypes.INT())
.column( "event_time", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
.build());
dataStream.print();
// 12> +I[1b6717eb5d93058ac3b40458a8a549a5e2fbb3b0fa146b36b7c58b5ebc1606cfc26ff9e4ebc3277832b9a8a0bfa1451d6608, 836085755, 1699941384531]
// 9> +I[6169d2f3a4766f5fce51cba66ccd33772ab72a690381563426417c75766f99de8b1fd5c3c7fc5ec48954df9299456f433fa9, -766105729, 1699941384531]
// 10> +I[e5a815e53d8fdf91b9382d7b15b6c076c5449e27b7ce505520c4334aba227d9a2fefd3333b2609704334b6fb866c244cf03d, 1552621997, 1699941384531]
env.execute();
}
public static void main(String[] args) throws Exception {
// test1();
// test2();
// test3();
test4();
}
}
示例test4()中数据类型支持哪些转换的更多信息,请参阅table API的数据类型页面。
toChangelogStream(Table).executeAndCollect()的行为等于调用Table.execute().collect()。然而,toChangelogStream(表)对于测试可能更有用,因为它允许访问DataStream API中后续ProcessFunction中生成的水印。
以上,本文介绍了Flink 的Changelog Streams与table 的集成2个示例。