一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文通过两个示例介绍了时态表TemporalTableFunction的join操作。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。
SELECT o.currency, o.amount, r.rate
o.amount * r.rate AS yen_amount
FROM
Orders AS o,
LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
就是使用静态数据实现,其验证结果在代码中的注释部分。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestTemporalTableFunctionDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Rate {
private String currency;
private Integer rate;
private Long rate_time;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private Long total;
private String currency;
private Long order_time;
}
final static List<Rate> rateList = Arrays.asList(
new Rate("US Dollar", 102, 1L),
new Rate("Euro", 114, 1L),
new Rate("Yen", 1, 1L),
new Rate("Euro", 116, 5L),
new Rate("Euro", 119, 7L)
);
final static List<Order> orderList = Arrays.asList(
new Order(2L, "Euro", 2L),
new Order(1L, "US Dollar", 3L),
new Order(50L, "Yen", 4L),
new Order(3L, "Euro", 5L)
);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// order 实时流 事实表
DataStream<Order> orderDs = env.fromCollection(orderList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, rTimeStamp) -> order.getOrder_time()));
// rate 实时流 维度表
DataStream<Rate> rateDs = env.fromCollection(rateList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Rate>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((rate, rTimeStamp) -> rate.getRate_time()));
// 转变为Table
Table orderTable = tenv.fromDataStream(orderDs, $("total"), $("currency"), $("order_time").rowtime());
Table rateTable = tenv.fromDataStream(rateDs, $("currency"), $("rate"), $("rate_time").rowtime());
tenv.createTemporaryView("alan_orderTable", orderTable);
tenv.createTemporaryView("alan_rateTable", rateTable);
// 定义一个TemporalTableFunction
TemporalTableFunction rateDim = rateTable.createTemporalTableFunction($("rate_time"), $("currency"));
// 注册表函数
// tenv.registerFunction("alan_rateDim", rateDim);
tenv.createTemporarySystemFunction("alan_rateDim", rateDim);
String sql = "select o.*,r.rate from alan_orderTable as o,Lateral table (alan_rateDim(o.order_time)) r where r.currency = o.currency ";
// 关联查询
Table result = tenv.sqlQuery(sql);
// 打印输出
DataStream resultDs = tenv.toAppendStream(result, Row.class);
resultDs.print();
// rate 流数据(维度表)
// rateList
// order 流数据
// orderList
// 控制台输出
// 2> +I[2, Euro, 1970-01-01T00:00:00.002, 114]
// 5> +I[50, Yen, 1970-01-01T00:00:00.004, 1]
// 16> +I[1, US Dollar, 1970-01-01T00:00:00.003, 102]
// 2> +I[3, Euro, 1970-01-01T00:00:00.005, 116]
env.execute();
}
}
本处使用的是kafka作为数据源来实现。其验证结果在代码中的注释部分。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.tablesql.join.bean.CityInfo;
import org.tablesql.join.bean.CityInfoSchema;
import org.tablesql.join.bean.UserInfo;
import org.tablesql.join.bean.UserInfoSchema;
public class TestJoinDimByKafkaEventTimeDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Kafka的ip和要消费的topic,//Kafka设置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
props.setProperty("group.id", "group.cyb.2");
// 读取用户信息Kafka
FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(),props);
userConsumer.setStartFromEarliest();
userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
// 读取城市维度信息Kafka
FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
cityConsumer.setStartFromEarliest();
cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());
tableEnv.createTemporaryView("userTable", userTable);
tableEnv.createTemporaryView("cityTable", cityTable);
// 定义一个TemporalTableFunction
TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
// 注册表函数
// tableEnv.registerFunction("dimCity", dimCity);
tableEnv.createTemporarySystemFunction("dimCity", dimCity);
Table u = tableEnv.sqlQuery("select * from userTable");
// u.printSchema();
tableEnv.toAppendStream(u, Row.class).print("user流接收到:");
Table c = tableEnv.sqlQuery("select * from cityTable");
// c.printSchema();
tableEnv.toAppendStream(c, Row.class).print("city流接收到:");
// 关联查询
Table result = tableEnv
.sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
"from userTable as u " +
", Lateral table (dimCity(u.ts)) d " +
"where u.cityId=d.cityId");
// 打印输出
DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
resultDs.print("\t关联输出:");
// 用户信息格式:
// {"userName":"user1","cityId":1,"ts":0}
// {"userName":"user1","cityId":1,"ts":1}
// {"userName":"user1","cityId":1,"ts":4}
// {"userName":"user1","cityId":1,"ts":5}
// {"userName":"user1","cityId":1,"ts":7}
// {"userName":"user1","cityId":1,"ts":9}
// {"userName":"user1","cityId":1,"ts":11}
// kafka-console-producer.sh --broker-list server1:9092 --topic user
// 城市维度格式:
// {"cityId":1,"cityName":"nanjing","ts":15}
// {"cityId":1,"cityName":"beijing","ts":1}
// {"cityId":1,"cityName":"shanghai","ts":5}
// {"cityId":1,"cityName":"shanghai","ts":7}
// {"cityId":1,"cityName":"wuhan","ts":10}
// kafka-console-producer.sh --broker-list server1:9092 --topic city
// 输出
// city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
// city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
env.execute("joinDemo");
}
}
以上,本文通过两个示例介绍了时态表TemporalTableFunction的join操作。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版