一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink 的insert-only流的datastream和table的相互转换三个示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
更多详细内容参考文章:
21、Flink 的table API与DataStream API 集成(完整版)
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<!-- <scope>provided</scope> -->
</dependency>
</dependencies>
StreamTableEnvironment提供了以下方法进行datastream的转换API:
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
虚拟CREATE TABLE name(schema)WITH(options)语句中的模式部分可以自动从DataStream的类型信息中派生、丰富或完全使用org.apache.flink.table.api.Schema手动定义。
The virtual DataStream table connector exposes the following metadata for every row:
虚拟DataStream table 连接器为每一行暴露以下元数据:
Key | Data Type | Description | R/W |
---|---|---|---|
rowtime | TIMESTAMP_LTZ(3) NOT NULL | Stream record’s timestamp. | R/W |
虚拟DataStream table source实现SupportsSourceWatermark,因此允许调用source_WATERMARK()内置函数作为水印策略,以采用来自DataStream API的水印。
下面的代码展示了如何将fromDataStream用于不同的场景。其输出结果均在每个步骤的输出注释部分。
import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestFromDataStreamDemo {
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、创建数据源
DataStream<User> dataStream =
env.fromElements(
new User("alan", 4, Instant.ofEpochMilli(1000)),
new User("alanchan", 6, Instant.ofEpochMilli(1001)),
new User("alanchanchn", 10, Instant.ofEpochMilli(1002)));
// 示例1、显示table的数据类型
// 说明了不需要基于时间的操作时的简单用例。
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
// 示例2、增加一列,并显示table的数据类型
// 这些基于时间的操作应在处理时间内工作的最常见用例。
Table table2 = tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build());
// table2.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
// )
// 示例3、增加rowtime列,并增加watermark
Table table3 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
// table3.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
// 示例4、增加rowtime列,并增加watermark(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)
// 基于时间的操作(如窗口或间隔联接)应成为管道的一部分时最常见的用例。
Table table4 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
// table4.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
// 示例5、修改event_time类型长度,增加event_time的水印策略(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)
// 完全依赖于用户的声明。这对于用适当的数据类型替换DataStream API中的泛型类型(在Table API中是RAW)很有用。
Table table5 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table5.printSchema();
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` STRING,
// `score` INT
// )
env.execute();
}
public static void main(String[] args) throws Exception {
test1() ;
}
}
由于DataType比TypeInformation更丰富,我们可以轻松地启用不可变POJO和其他复杂的数据结构。
下面的Java示例显示了可能的情况。
package org.tablesql.convert;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestFromDataStreamDemo {
// user2的属性都加上了final修饰符
public static class User2 {
public final String name;
public final Integer score;
public User2(String name, Integer score) {
this.name = name;
this.score = score;
}
}
public static void test2() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//the DataStream API does not support immutable POJOs yet, the class will result in a generic type that is a RAW type in Table API by defaul
//DataStream API尚不支持不可变POJO,该类的结果默认情况下将是一个Table API中是RAW类型的泛型。
// 2、创建数据源
DataStream<User2> dataStream = env.fromElements(
new User2("Alice", 4),
new User2("Bob", 6),
new User2("Alice", 10));
// 示例1:输出表结构
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
// (
// `f0` RAW('org.tablesql.convert.TestFromDataStreamDemo$User2', '...')
// )
// 示例2:声明式输出表结构
// 在自定义模式中使用table API的类型系统为列声明更有用的数据类型,并在下面的“as”投影中重命名列
Table table2 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User2.class))
.build())
.as("user");
// table2.printSchema();
// (
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*
// )
//示例3:数据类型可以如上所述反射地提取或显式定义
//
Table table3 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User2.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table3.printSchema();
// (
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*
// )
env.execute();
}
public static void main(String[] args) throws Exception {
test2();
}
}
DataStream可以直接注册为视图。
从DataStream 创建的视图只能注册为临时视图。由于它们的内联/匿名性质,无法在永久目录(permanent catalog)中注册它们。
下面的代码展示了如何对不同的场景使用createTemporaryView。每个示例中的运行结果均在输出部分以注释展示。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author alanchan
*
*/
public class TestCreateTemporaryViewDemo {
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、创建数据源
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(Tuple2.of(12L, "alan"), Tuple2.of(0L, "alanchan"));
// 示例1:创建视图、输出表结构
tenv.createTemporaryView("MyView", dataStream);
tenv.from("MyView").printSchema();
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )
// 示例2:创建视图、输出表结构,使用Schema显示定义列,类似于fromDataStream的定义
//在这个例子中,输出的NOT NULL没有定义
tenv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("f0", "BIGINT")
.column("f1", "STRING")
.build());
tenv.from("MyView").printSchema();
// (
// `f0` BIGINT,
// `f1` STRING
// )
// 示例3:创建视图,并输出表结构
// 在创建视图前修改(或定义)列名称,as一般是指重命名,原名称是f0、f1
tenv.createTemporaryView(
"MyView",
tenv.fromDataStream(dataStream).as("id", "name"));
tenv.from("MyView").printSchema();
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
test1();
}
}
下面的代码展示了如何在不同的场景中使用toDataStream。每个示例中的运行结果均在输出部分以注释展示。
import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestToDataStreamDemo {
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
static final String SQL = "CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')";
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、建表
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
// 示例1:table 转 datastream
// 使用默认的Row实例转换
// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印
// DataStream<Row> dataStream = tenv.toDataStream(table);
// dataStream.print();
// 以下是示例性输出,实际上是连续的数据
// 10> +I[9b979ecef142c06746ff2be0f79f4afe7ef7089f60f267184e052c12ef5f2c2a144c73d3653bee51b351ed5b20ecaf0673ec, -1424631858, 2023-11-14T02:58:56.071Z]
// 1> +I[444998c8992accc54e2c10cac4f4a976cda516d84817a8fd728c9d013da3d87e91d28537a564f09fb07308142ca83c2548e9, -1240938499, 2023-11-14T02:58:56.071Z]
// 12> +I[fa42df01fe1f789535df26f81c2e58c02feaeba60338e4cfb7c8fdb06ed96c69b46e9a966d93d0cf811b24dd9434a8ef2253, 2039663083, 2023-11-14T02:58:56.070Z]
// 1> +I[25aa121a0d656a5355c32148a0c68cc39ac05443bd7de6a0c499a2daae85868422dd024c6803598133dc26a607cd1e60e747, 1912789884, 2023-11-14T02:58:56.071Z]
// 示例2:table 转 datastream
// 从类“User”中提取数据类型,planner重新排序字段,并在可能的情况下插入隐式转换,以将内部数据结构转换为所需的结构化类型
// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印
DataStream<User> dataStream2 = tenv.toDataStream(table, User.class);
// dataStream2.print();
// 以下是示例性输出,实际上是连续的数据
// 4> TestToDataStreamDemo.User(name=e80b612e48443a292c11e28159c73475b9ef9531b91d5712420753d5d6041a06f5de634348210b151f4fc220b4ec91ed5c72, score=2146560121, event_time=2023-11-14T03:01:17.657Z)
// 14> TestToDataStreamDemo.User(name=290b48dea62368bdb35567f31e5e2690ad8b5dd50c1c0f7184f15d2e85b24ea84155f1edef875f4c96e3a2133a320fcb6e41, score=2062379192, event_time=2023-11-14T03:01:17.657Z)
// 12> TestToDataStreamDemo.User(name=a0b31a03ad951b53876445001bbc74178c9818ece7d5e53166635d40cb8ef07980eabd7463ca6be38b34b1f0fbd4e2251df0, score=16953697, event_time=2023-11-14T03:01:17.657Z)
// 示例3:table 转 datastream
// 数据类型可以如上所述反射地提取或显式定义
DataStream<User> dataStream3 =
tenv.toDataStream(
table,
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
dataStream3.print();
// 以下是示例性输出,实际上是连续的数据
// 9> TestToDataStreamDemo.User(name=49550693e3cb3a41cd785504c699684bf2015f0ebff5918dbdea454291c265d316773f2d9507ce73dd18f91a2f5fdbd6e500, score=744771891, event_time=2023-11-14T03:06:13.010Z)
// 2> TestToDataStreamDemo.User(name=60589709fe41decb647fcf4e2f91d45c82961bbe64469f3ea8a9a12b0cac071481ec9cfd65a9c218e3799986dd72ab80e457, score=-1056249244, event_time=2023-11-14T03:06:13.010Z)
// 15> TestToDataStreamDemo.User(name=d0a179f075c8b521bf5ecb08a32f6c715b5f2c616f815f8173c0a1c2961c53774faf396ddf55a44db49abe8085772f35d75c, score=862651361, event_time=2023-11-14T03:06:13.010Z)
env.execute();
}
public static void main(String[] args) throws Exception {
test1() ;
}
}
toDataStream仅支持非更新表。通常,基于时间的操作(如windows, interval joins或MATCH_RECOGNIZE子句)非常适合于在 insert-only pipelines的简单操作(如投影(projections )和过滤)。
具有生成更新的操作的管道可以使用toChangelogStream。
以上,本文介绍了Flink 的insert-only流的datastream和table的相互转换三个示例。