一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<!-- <scope>provided</scope> -->
</dependency>
</dependencies>
实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
// 用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<Integer, User> broadcastDesc;
JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
this.broadcastDesc = broadcastDesc;
}
// 负责处理广播流的元素
@Override
public void processBroadcastElement(User value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
}
// 处理非广播流,关联维度
@Override
public void processElement(Order value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);
out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
}
}
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestJoinDimFromBroadcastDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流
DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
// "RulesBroadcastState",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint<Rule>() {
// }));
// 广播流,广播规则并且创建 broadcast state
// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 将user流(维表)定义为广播流
final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
Integer.class,
User.class);
BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);
// 需要由非广播流来进行调用
DataStream result = orderDs.connect(broadcastStream)
.process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
env.execute();
}
// final BroadcastProcessFunction<IN1, IN2, OUT> function)
// static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
// // 用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor<Integer, User> broadcastDesc;
// JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
// this.broadcastDesc = broadcastDesc;
// }
// // 负责处理广播流的元素
// @Override
// public void processBroadcastElement(User value,
// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
// Collector<Tuple2<Order, String>> out) throws Exception {
// System.out.println("收到广播数据:" + value);
// // 得到广播流的存储状态
// ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
// }
// // 处理非广播流,关联维度
// @Override
// public void processElement(Order value,
// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
// Collector<Tuple2<Order, String>> out) throws Exception {
// // 得到广播流的存储状态
// ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);
// out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
// }
// }
}
本示例使用的是两个socket数据源,通过netcat进行模拟。
“192.168.10.42”, 8888
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
“192.168.10.42”, 9999
// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175
// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
以上,本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。