一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
本示例的运行结果均在执行用例中,其中用例只能在批模式下工作,用例特意说明了,如果没说明的则意味着流批模式均可。
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Executable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo.Order;
import org.tablesql.TestTableAPIJoinOperationDemo.User;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIJoinOperationDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private double balance;
private Long rowtime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private long id;
private long user_id;
private double amount;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
final static List<Order> orderList = Arrays.asList(
new Order(1L, 1, 18, 1698742358391L),
new Order(2L, 2, 19, 1698742359396L),
new Order(3L, 1, 25, 1698742360407L),
new Order(4L, 3, 28, 1698742361409L),
new Order(5L, 1, 29, 1698742362424L),
new Order(6L, 4, 49, 1698742362424L)
);
// 创建输出表
final static String sinkSql = "CREATE TABLE sink_table (\n" +
" id BIGINT,\n" +
" user_id BIGINT,\n" +
" amount DOUBLE,\n" +
" rowtime BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
/**
*
* @throws Exception
*/
static void testUnionBySQL() throws Exception {
// TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
DataStream<Order> orderA = env.fromCollection(orderList);
DataStream<Order> orderB = env.fromCollection(orderList);
// 将DataStream数据转Table和View,然后查询
Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
tenv.createTemporaryView("tableB", orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
// 查询:tableA中amount>2的和tableB中amount>1的数据最后合并
// select * from tableA where amount > 2
// union
// select * from tableB where amount > 1
String sql = "select * from " + tableA + " where amount > 2 union select * from tableB where amount > 1";
Table resultTable = tenv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);// union使用toRetractStream
// String sql = "select * from " + tableA + " where amount > 2 union select * from tableB where amount > 1";
// 9> (true,TestTableAPIJoinOperationDemo2.Order(id=1, user_id=1, amount=18.0, rowtime=1698742358391))
// 8> (true,TestTableAPIJoinOperationDemo2.Order(id=2, user_id=2, amount=19.0, rowtime=1698742359396))
// 4> (true,TestTableAPIJoinOperationDemo2.Order(id=5, user_id=1, amount=29.0, rowtime=1698742362424))
// 8> (true,TestTableAPIJoinOperationDemo2.Order(id=4, user_id=3, amount=28.0, rowtime=1698742361409))
// 14> (true,TestTableAPIJoinOperationDemo2.Order(id=6, user_id=4, amount=49.0, rowtime=1698742362424))
// 6> (true,TestTableAPIJoinOperationDemo2.Order(id=3, user_id=1, amount=25.0, rowtime=1698742360407))
// toAppendStream → 将计算后的数据append到结果DataStream中去
// toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
// 类似StructuredStreaming中的append/update/complete
// TODO 3.sink
resultDS.print();
// TODO 4.execute
env.execute();
}
/**
* 和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。
* 本示例仅仅使用同一个表来演示
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testUnion() throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(5L, 1, 29, 1698742362424L),
row(6L, 4, 49, 1698742362424L)
));
Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table unionResult = left.union(left);
tenv.createTemporaryView("order_union_t", unionResult);
Table result = tenv.sqlQuery("select * from order_union_t");
// 下面不能转换,只有流式表可以转成流
// 出现异常:The UNION operation on two unbounded tables is currently not supported.
// DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(result, Order.class);
// resultDS.print();
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
// +I[5, 1, 29.0, 1698742362424]
// +I[1, 1, 18.0, 1698742358391]
// +I[3, 1, 25.0, 1698742360407]
// +I[4, 3, 28.0, 1698742361409]
// +I[2, 2, 19.0, 1698742359396]
}
/**
* 和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。
* 本示例仅仅使用同一个表来演示
*
* @throws Exception
*/
static void testUnionAll() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table result = left.unionAll(left);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 14> (true,+I[5, alanchan, 29.0, 1698742362424])
// 8> (true,+I[4, alanchan, 28.0, 1698742361409])
// 5> (true,+I[1, alan, 18.0, 1698742358391])
// 10> (true,+I[1, alan, 18.0, 1698742358391])
// 11> (true,+I[2, alan, 19.0, 1698742359396])
// 6> (true,+I[2, alan, 19.0, 1698742359396])
// 7> (true,+I[3, alan, 25.0, 1698742360407])
// 13> (true,+I[4, alanchan, 28.0, 1698742361409])
// 12> (true,+I[3, alan, 25.0, 1698742360407])
// 9> (true,+I[5, alanchan, 29.0, 1698742362424])
env.execute();
}
/**
* 和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。
* 如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testIntersect() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.intersect(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[1, 1, 18.0, 1698742358391]
}
/**
* 和 SQL INTERSECT ALL 子句类似。
* IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testIntersectAll() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.intersectAll(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[2, 2, 19.0, 1698742359396]
// +I[1, 1, 18.0, 1698742358391]
}
/**
* 和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。
* 左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testMinus() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.minus(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
}
/**
* 和 SQL EXCEPT ALL 子句类似。
* MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,
* 例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。
* 两张表必须具有相同的字段类型。
* 该操作只能是在批处理模式下
*
* @throws Exception
*/
static void testMinusAll() throws Exception {
EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
TableEnvironment tenv = TableEnvironment.create(env);
Table ordersTableA = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(6L, 4, 49, 1698742362424L)
));
Table ordersTableB = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("user_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("rowtime", DataTypes.BIGINT())
),
Arrays.asList(
row(1L, 1, 18, 1698742358391L),
row(2L, 2, 19, 1698742359396L),
row(3L, 1, 25, 1698742360407L),
row(4L, 3, 28, 1698742361409L),
row(7L, 8, 4009, 1698782362424L)
));
Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));
Table intersectResult = left.minus(right);
tenv.createTemporaryView("order_intersect_t", intersectResult);
Table result = tenv.sqlQuery("select * from order_intersect_t");
//输出表
tenv.executeSql(sinkSql);
result.executeInsert("sink_table");
// +I[6, 4, 49.0, 1698742362424]
}
/**
* 和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。
* 子查询表必须由一列组成。
* 这个列必须与表达式具有相同的数据类型。
*
* @throws Exception
*/
static void testIn() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));
DataStream<Order> orders = env.fromCollection(orderList);
Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));
Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
Table right = ordersTable.select($("user_id"));
Table result = left.select($("userId"), $("name"), $("balance"),$("u_rowtime")).where($("userId").in(right));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 3> (true,+I[4, alanchan, 28.0, 1698742361409])
// 12> (true,+I[1, alan, 18.0, 1698742358391])
// 15> (true,+I[3, alan, 25.0, 1698742360407])
// 12> (true,+I[2, alan, 19.0, 1698742359396])
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testUnion();
// testUnionAll();
// testUnionBySQL();
// testIntersect();
// testIntersectAll() ;
// testMinus();
// testMinusAll();
testIn();
}
}
以上,本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版