一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文着重介绍了Flink的时态表进行维表数据join操作,实现了三种方式。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,还依赖kafka的环境。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,事实流与这个Temporal table进行join,可以关联到某一个版本视图的维度数据。
该种方式维度数据量可以很大,维表数据实时更新,不依赖于第三方存储,并且提供不同版本的维表数据(应对维表信息更新)。截至版本Flink 1.17该种方式只能在Flink SQL API中使用。
关于时间参数,flink有三个时间,即eventtime、processingtime和injectiontime,常用的是eventtime和processingtime,本文介绍其实现方式。关于eventtime的实现,kafka与其他的数据源还有不同,本文单独介绍一下kafka的实现方式。
package org.tablesql.join;
import static org.apache.flink.table.api.Expressions.$;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 基于处理时间的时态表
*/
public class TestJoinDimByProcessingTimeDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// order 实时流 事实表
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流 维度表
DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]),
Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 转变为Table
Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_ps").proctime());
Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"),
$("user_ps").proctime());
// 定义一个TemporalTableFunction
TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_ps"), $("id"));
// 注册表函数
tenv.registerFunction("alan_userDim", userDim);
// 关联查询
Table result = tenv
.sqlQuery("select o.* , u.name from " + orderTable + " as o , Lateral table (alan_userDim(o.order_ps)) u " +
"where o.uId = u.id");
// 打印输出
DataStream resultDs = tenv.toAppendStream(result, Row.class);
resultDs.print();
// user 流数据(维度表)
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
// order 流数据
// 26,1002,311
// 27,1004,334
// 28,1005,475
// 控制台输出
// 15> +I[26, 1002, 311.0, 2023-12-20T05:21:12.977Z, alanchan]
// 11> +I[27, 1004, 334.0, 2023-12-20T05:21:50.898Z, alan_chan]
// 5> +I[28, 1005, 475.0, 2023-12-20T05:21:57.559Z, alan_chan_chn]
env.execute();
}
}
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestjoinDimByEventTimeDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
private Long eventTime;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private Integer id;
private Integer uId;
private Double total;
private Long eventTime;
}
final static List<User> userList = Arrays.asList(
new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com", 1L),
new User(1002, "alan", 30d, 19, "alan.chan.chn@163.com", 10L),
new User(1003, "alan", 29d, 25, "alan.chan.chn@163.com", 1L),
new User(1004, "alanchan", 22d, 28, "alan.chan.chn@163.com", 5L),
new User(1005, "alanchan", 50d, 29, "alan.chan.chn@163.com", 1698742362424L));
final static List<Order> orderList = Arrays.asList(
new Order(11, 1002, 1084d, 1L),
new Order(12, 1001, 84d, 10L),
new Order(13, 1005, 369d, 2L));
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// order 实时流 事实表
// DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
// .map(o -> {
// String[] lines = o.split(",");
// return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]), Long.valueOf(lines[3]));
// })
// .assignTimestampsAndWatermarks(WatermarkStrategy
// .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
// .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));
DataStream<Order> orderDs = env.fromCollection(orderList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));
// user 实时流 维度表
// DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
// .map(o -> {
// String[] lines = o.split(",");
// return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4], Long.valueOf(lines[3]));
// })
// .assignTimestampsAndWatermarks(WatermarkStrategy
// .<User>forBoundedOutOfOrderness(Duration.ofSeconds(10))
// .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));
DataStream<User> userDs = env.fromCollection(userList)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));
// 转变为Table
Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_eventTime").rowtime());
Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_eventTime").rowtime());
tenv.createTemporaryView("alan_orderTable", orderTable);
tenv.createTemporaryView("alan_userTable", userTable);
// 定义一个TemporalTableFunction
TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_eventTime"), $("id"));
// 注册表函数
tenv.registerFunction("alan_userDim", userDim);
// String sql = "select o.* from alan_orderTable as o ";
// String sql = "select u.* from alan_userTable as u ";
// String sql = "select o.*,u.name from alan_orderTable as o , alan_userTable as u where o.uId = u.id";
String sql = "select o.*,u.name from alan_orderTable as o,Lateral table (alan_userDim(o.order_eventTime)) u where o.uId = u.id";
// 关联查询
Table result = tenv.sqlQuery(sql);
// 打印输出
DataStream resultDs = tenv.toAppendStream(result, Row.class);
resultDs.print();
// user 流数据(维度表)
// userList
// order 流数据
// orderList
// 控制台输出
// 3> +I[12, 1001, 84.0, 1970-01-01T00:00:00.010, alan]
env.execute();
}
}
package org.tablesql.join.bean;
import java.io.Serializable;
import lombok.Data;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Data
public class CityInfo implements Serializable {
private Integer cityId;
private String cityName;
private Long ts;
}
package org.tablesql.join.bean;
import java.io.Serializable;
import lombok.Data;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Data
public class UserInfo implements Serializable {
private String userName;
private Integer cityId;
private Long ts;
}
package org.tablesql.join.bean;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class CityInfoSchema implements DeserializationSchema<CityInfo> {
@Override
public CityInfo deserialize(byte[] message) throws IOException {
String jsonStr = new String(message, StandardCharsets.UTF_8);
CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
return data;
}
@Override
public boolean isEndOfStream(CityInfo nextElement) {
return false;
}
@Override
public TypeInformation<CityInfo> getProducedType() {
return TypeInformation.of(new TypeHint<CityInfo>() {
});
}
}
package org.tablesql.join.bean;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class UserInfoSchema implements DeserializationSchema<UserInfo> {
@Override
public UserInfo deserialize(byte[] message) throws IOException {
String jsonStr = new String(message, StandardCharsets.UTF_8);
UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
return data;
}
@Override
public boolean isEndOfStream(UserInfo nextElement) {
return false;
}
@Override
public TypeInformation<UserInfo> getProducedType() {
return TypeInformation.of(new TypeHint<UserInfo>() {
});
}
}
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import static org.apache.flink.table.api.Expressions.$;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.tablesql.join.bean.CityInfo;
import org.tablesql.join.bean.CityInfoSchema;
import org.tablesql.join.bean.UserInfo;
import org.tablesql.join.bean.UserInfoSchema;
public class TestJoinDimByKafkaEventTimeDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Kafka的ip和要消费的topic,//Kafka设置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
props.setProperty("group.id", "kafkatest");
// 读取用户信息Kafka
FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(),props);
userConsumer.setStartFromEarliest();
userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
// 读取城市维度信息Kafka
FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
cityConsumer.setStartFromEarliest();
cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
.<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
);
Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());
tableEnv.createTemporaryView("userTable", userTable);
tableEnv.createTemporaryView("cityTable", cityTable);
// 定义一个TemporalTableFunction
TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
// 注册表函数
// tableEnv.registerFunction("dimCity", dimCity);
tableEnv.createTemporarySystemFunction("dimCity", dimCity);
Table u = tableEnv.sqlQuery("select * from userTable");
// u.printSchema();
tableEnv.toAppendStream(u, Row.class).print("user流接收到:");
Table c = tableEnv.sqlQuery("select * from cityTable");
// c.printSchema();
tableEnv.toAppendStream(c, Row.class).print("city流接收到:");
// 关联查询
Table result = tableEnv
.sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
"from userTable as u " +
", Lateral table (dimCity(u.ts)) d " +
"where u.cityId=d.cityId");
// 打印输出
DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
resultDs.print("\t关联输出:");
// 用户信息格式:
// {"userName":"user1","cityId":1,"ts":0}
// {"userName":"user1","cityId":1,"ts":1}
// {"userName":"user1","cityId":1,"ts":4}
// {"userName":"user1","cityId":1,"ts":5}
// {"userName":"user1","cityId":1,"ts":7}
// {"userName":"user1","cityId":1,"ts":9}
// {"userName":"user1","cityId":1,"ts":11}
// kafka-console-producer.sh --broker-list server1:9092 --topic user
// 城市维度格式:
// {"cityId":1,"cityName":"nanjing","ts":15}
// {"cityId":1,"cityName":"beijing","ts":1}
// {"cityId":1,"cityName":"shanghai","ts":5}
// {"cityId":1,"cityName":"shanghai","ts":7}
// {"cityId":1,"cityName":"wuhan","ts":10}
// kafka-console-producer.sh --broker-list server1:9092 --topic city
// 输出
// city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
// city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
// city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
// user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
// 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
// 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
env.execute("joinDemo");
}
}
以上,本文着重介绍了Flink的时态表进行维表数据join操作,实现了三种方式。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)