一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文给出了关于表数据的聚合操作示例,比如group by、distinct、以及group by、over、distinct的窗口聚合示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
本示例内容较多,下文是本部分示例的公共代码部分。
本部分仅仅就是用的公共对象,比如User的定义,和需要引入的包。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIOperationDemo2 {
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private int balance;
private Long rowtime;
}
}
本示例仅仅展示了group by操作,比较简单。
static void test2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql(sourceSql);
Table table = tenv.from("Alan_KafkaTable");
//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。
Table result = table.groupBy($("user_id")).select($("user_id"), $("user_id").count().as("count(user_id)"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 12> (true,+I[1, 1])
env.execute();
}
使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
static void test3() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
)
;
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
//使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
Table result = usersTable
.window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口
.groupBy($("name"), $("w")) // 按窗口和键分组
// 访问窗口属性并聚合
.select(
$("name"),
$("w").start(),
$("w").end(),
$("w").rowtime(),
$("balance").sum().as("sum(balance)")
);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 2> (true,+I[alan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 62])
// 16> (true,+I[alanchan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 57])
env.execute();
}
和 SQL 的 OVER 子句类似。
static void test4() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
//所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。
//尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。
Table result = usersTable
// 定义窗口
.window(
Over
.partitionBy($("name"))
.orderBy($("rowtime"))
.preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE))
.following(unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE))
.as("w"))
// 滑动聚合
.select(
$("id"),
$("balance").avg().over($("w")),
$("balance").max().over($("w")),
$("balance").min().over($("w"))
);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 2> (true,+I[1, 18, 18, 18])
// 16> (true,+I[4, 28, 28, 28])
// 2> (true,+I[2, 18, 19, 18])
// 16> (true,+I[5, 28, 29, 28])
// 2> (true,+I[3, 20, 25, 18])
env.execute();
}
/**
* 和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。
* Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值。
* Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。
* @throws Exception
*/
static void test5() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
// 按属性分组后的的互异(互不相同、去重)聚合
Table groupByDistinctResult = usersTable
.groupBy($("name"))
.select($("name"), $("balance").sum().distinct().as("sum_balance"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(groupByDistinctResult, Row.class);
// resultDS.print();
// 2> (true,+I[alan, 18])
// 16> (true,+I[alanchan, 28])
// 16> (false,-U[alanchan, 28])
// 2> (false,-U[alan, 18])
// 16> (true,+U[alanchan, 57])
// 2> (true,+U[alan, 37])
// 2> (false,-U[alan, 37])
// 2> (true,+U[alan, 62])
//按属性、时间窗口分组后的互异(互不相同、去重)聚合
Table groupByWindowDistinctResult = usersTable
.window(Tumble
.over(lit(5).minutes())
.on($("rowtime"))
.as("w")
)
.groupBy($("name"), $("w"))
.select($("name"), $("balance").sum().distinct().as("sum_balance"));
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(groupByDistinctResult, Row.class);
// result2DS.print();
// 16> (true,+I[alanchan, 28])
// 2> (true,+I[alan, 18])
// 16> (false,-U[alanchan, 28])
// 2> (false,-U[alan, 18])
// 16> (true,+U[alanchan, 57])
// 2> (true,+U[alan, 37])
// 2> (false,-U[alan, 37])
// 2> (true,+U[alan, 62])
//over window 上的互异(互不相同、去重)聚合
Table result = usersTable
.window(Over
.partitionBy($("name"))
.orderBy($("rowtime"))
.preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE))
.as("w"))
.select(
$("name"), $("balance").avg().distinct().over($("w")),
$("balance").max().over($("w")),
$("balance").min().over($("w"))
);
DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result, Row.class);
result3DS.print();
// 16> (true,+I[alanchan, 28, 28, 28])
// 2> (true,+I[alan, 18, 18, 18])
// 2> (true,+I[alan, 18, 19, 18])
// 16> (true,+I[alanchan, 28, 29, 28])
// 2> (true,+I[alan, 20, 25, 18])
env.execute();
}
用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同(互异、去重的)值的聚合结果,则只需向聚合函数添加 distinct 修饰符即可。
Table orders = tEnv.from("Orders");
// 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users")
.select(
$("users"),
call("myUdagg", $("points")).distinct().as("myDistinctResult")
);
和 SQL 的 DISTINCT 子句类似。 返回具有不同组合值的记录。
static void test6() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L),
new User(5L, "alanchan", 29, 1698742362424L)
);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
// Table orders = tableEnv.from("Orders");
Table result = usersTable.distinct();
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 数据集有6条记录,并且有一条是重复的,故只输出5条
// 9> (true,+I[2, alan, 19, 2023-10-31T08:52:39.396])
// 1> (true,+I[1, alan, 18, 2023-10-31T08:52:38.391])
// 13> (true,+I[3, alan, 25, 2023-10-31T08:52:40.407])
// 7> (true,+I[4, alanchan, 28, 2023-10-31T08:52:41.409])
// 13> (true,+I[5, alanchan, 29, 2023-10-31T08:52:42.424])
env.execute();
}
以上,本文给出了关于表数据的聚合操作示例,比如group by、distinct、以及group by、over、distinct的窗口聚合示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版