一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文给出针对表字段的各种操作及验证。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文需要有kafka的运行环境。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
针对表的字段进行操作,具体示例如下,运行结果在源文件中。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.concat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestTableAPIOperationDemo {
static String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n"
+ " `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n"
+ " `partition` BIGINT METADATA VIRTUAL,\r\n"
+ " `offset` BIGINT METADATA VIRTUAL,\r\n"
+ " `user_id` BIGINT,\r\n"
+ " `item_id` BIGINT,\r\n"
+ " `behavior` STRING\r\n"
+ ") WITH (\r\n"
+ " 'connector' = 'kafka',\r\n"
+ " 'topic' = 'user_behavior',\r\n"
+ " 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n"
+ " 'properties.group.id' = 'testGroup',\r\n"
+ " 'scan.startup.mode' = 'earliest-offset',\r\n"
+ " 'format' = 'csv'\r\n"
+ ");";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// test1();
// test2();
test3();
}
static void test3() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql(sourceSql);
Table table1 = tenv.from("Alan_KafkaTable");
// 重命名字段。
Table result = table1.as("a","b","c","d","e","f");
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
//和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。
Table table2 = result.where($("f").isEqual("login"));
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(table2, Row.class);
result2DS.print();
//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
Table table3 = result.where($("f").isNotEqual("login"));
DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(table3, Row.class);
result3DS.print();
// 没有匹配条件的记录,无输出
Table table4 = result
.filter(
and(
$("f").isNotNull(),
// $("d").isGreater(1)
$("e").isNotNull()
)
);
DataStream<Tuple2<Boolean, Row>> result4DS = tenv.toRetractStream(table4, Row.class);
result4DS.print("test filter:");
//test filter::11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
env.execute();
}
/**
* 和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。
*
* 你可以使用 row(...) 表达式创建复合行:
*
* @throws Exception
*/
static void test2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
Table table = tenv.fromValues(row(1, "ABC"), row(2L, "ABCDE"));
table.printSchema();
// (
// `f0` BIGINT NOT NULL,
// `f1` VARCHAR(5) NOT NULL
// )
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(table, Row.class);
resultDS.print();
// 1> (true,+I[2, ABCDE])
// 2> (true,+I[1, ABC])
Table table2 = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABCD"),
row(2L, "ABCDEF")
);
table2.printSchema();
// (
// `id` DECIMAL(10, 2),
// `name` STRING
// )
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(table2, Row.class);
result2DS.print();
// 15> (true,+I[2.00, ABCDEF])
// 14> (true,+I[1.00, ABCD])
env.execute();
}
/**
* 和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。
*
* @throws Exception
*/
static void test1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql(sourceSql);
// 查询
// tenv.from("Alan_KafkaTable").execute().print();
// kafka输入数据
// 1,1002,login
// 应用程序控制台输出如下
// +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
// | op | event_time | partition | offset | user_id | item_id | behavior |
// +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
// | +I | 2023-11-01 11:00:30.183 | 0 | 2 | 1 | 1002 | login |
Table temp = tenv.from("Alan_KafkaTable");
//和 SQL 的 SELECT 子句类似。 执行一个 select 操作
Table result1 = temp.select($("user_id"), $("item_id").as("behavior"), $("event_time"));
DataStream<Tuple2<Boolean, Row>> result1DS = tenv.toRetractStream(result1, Row.class);
// result1DS.print();
// 11> (true,+I[1, 1002, 2023-11-01T11:00:30.183])
//选择星号(*)作为通配符,select 表中的所有列。
Table result2 = temp.select($("*"));
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
result2DS.print();
// 11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
env.execute();
}
static void test5() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql(sourceSql);
Table table = tenv.from("Alan_KafkaTable");
//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。
Table result = table.groupBy($("user_id")).select($("user_id"), $("user_id").count().as("count(user_id)"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 12> (true,+I[1, 1])
env.execute();
}
static void test4() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql(sourceSql);
Table table = tenv.from("Alan_KafkaTable");
//执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。
Table result2 = table.addColumns($("behavior").plus(1).as("t_col1"));
result2.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING,
// `t_col1` STRING
// )
Table result = table.addColumns($("behavior").plus(1).as("t_col3"), concat($("behavior"), "alanchan").as("t_col4"));
result.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING,
// `t_col3` STRING,
// `t_col4` STRING
// )
Table result3 = table.addColumns(concat($("behavior"), "alanchan").as("t_col4"));
result3.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING,
// `t_col4` STRING
// )
//执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。
Table result4 = result3.addOrReplaceColumns(concat($("t_col4"), "alanchan").as("t_col"));
result4.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING,
// `t_col4` STRING,
// `t_col` STRING
// )
Table result5 = result4.dropColumns($("t_col4"), $("t_col"));
result5.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING
// )
//执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。
Table result6 = result4.renameColumns($("t_col4").as("col1"), $("t_col").as("col2"));
result6.printSchema();
// (
// `event_time` TIMESTAMP(3),
// `partition` BIGINT,
// `offset` BIGINT,
// `user_id` BIGINT,
// `item_id` BIGINT,
// `behavior` STRING,
// `col1` STRING,
// `col2` STRING
// )
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(table, Row.class);
resultDS.print();
// 11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
env.execute();
}
}
以上,本文给出针对表字段的各种操作及验证。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版