一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
在批处理模式下,也即有界情况下,order by 可以单独使用,排序也可以是任意字段,与一般数据库的排序结果一样。
在流模式下,也即无界的情况下,order by需要和fetch一起使用,排序字段需要有时间属性,与一般数据库的排序有点差异。
需要说明的是order by 和offset&fetch都可以在批处理模式和流模式下工作。
具体结果见下面示例
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo2.Order;
import org.tablesql.TestTableAPIJoinOperationDemo2.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo3 {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
// 创建输出表
final static String sinkSql = "CREATE TABLE sink_table (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" amount DOUBLE,\n" +
" rowtime BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
/**
* Order By
* 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
* 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
* Sort on a non-time-attribute field is not supported.
*
* Offset & Fetch
* 和 SQL 的 OFFSET 和 FETCH 子句类似。
* Offset 操作根据偏移位置来限定(可能是已排序的)结果集。
* Fetch 操作将(可能已排序的)结果集限制为前 n 行。
* 通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。
*
* @throws Exception
*/
static void testOrderByWithUnbounded() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
usersTable.printSchema();
// 从已排序的结果集中返回前3条记录
Table result = usersTable.orderBy($("rowtime").desc()).fetch(3);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
// resultDS.print();
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+I[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[3, alan, 25.0, 2023-10-31T08:52:40.407])
// (false,-D[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+I[4, alanchan, 28.0, 2023-10-31T08:52:41.409])
// (false,-D[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[5, alanchan, 29.0, 2023-10-31T08:52:42.424])
// 从已排序的结果集中返回跳过2条记录之后的所有记录
Table result2 = usersTable.orderBy($("rowtime").desc()).offset(2).fetch(4);
DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
result2DS.print();
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (false,-U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+U[3, alan, 25.0, 2023-10-31T08:52:40.407])
// (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
// (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
// (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
env.execute();
}
/**
* 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
* 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
* 这个和一般的查询数据库的结果比较类似
*
* @throws Exception
*/
static void testOrderByWithBounded() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(5L, 1, 29, 1698742362424L),
row(6L, 4, 49, 1698742362424L)
));
Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table orderByResult = left.orderBy($("amount").desc());
tenv.createTemporaryView("order_union_t", orderByResult);
Table result = tenv.sqlQuery("select * from order_union_t");
//输出表
tenv.executeSql(sinkSql);
// +I[6, 4, 49.0, 1698742362424]
// +I[5, 1, 29.0, 1698742362424]
// +I[4, 3, 28.0, 1698742361409]
// +I[3, 1, 25.0, 1698742360407]
// +I[2, 2, 19.0, 1698742359396]
// +I[1, 1, 18.0, 1698742358391]
result.executeInsert("sink_table");
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testOrderByWithUnbounded();
testOrderByWithBounded();
}
}
和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释,用 TablePipeline.execute() 来执行。
输出表必须已注册在 TableEnvironment中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。
该示例仅仅展示一个方法,运行环境和其他的示例一致,并且本示例仅仅展示的是insert Into,也可以使用execute Insert方法,在其他示例中有展示其使用。
static void testInsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<Order> orderA = env.fromCollection(orderList);
DataStream<Order> orderB = env.fromCollection(
Arrays.asList(
new Order(10L, 1, 18, 1698742358391L),
new Order(16L, 4, 49, 1698742362424L)
)
);
Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table tableB = tenv.fromDataStream(orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
tenv.executeSql(sinkSql);
tableA.insertInto("sink_table").execute();
tableB.insertInto("sink_table").execute();
// +I[1, 1, 18.0, 1698742358391]
// +I[2, 2, 19.0, 1698742359396]
// +I[3, 1, 25.0, 1698742360407]
// +I[4, 3, 28.0, 1698742361409]
// +I[5, 1, 29.0, 1698742362424]
// +I[6, 4, 49.0, 1698742362424]
// +I[10, 1, 18.0, 1698742358391]
// +I[16, 4, 49.0, 1698742362424]
}
以上,本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版