https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/datastream/fault-tolerance/broadcast_state/
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
在这里我们使用一个例子来展现 broadcast state 提供的接口。
假设存在一个序列,序列中的元素是具有不同颜色与形状的图形,我们希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)。 同时,我们希望寻找的模式也会随着时间而改变。
在这个例子中,我们定义两个流,一个流包含图形(Item),具有颜色和形状两个属性。另一个流包含特定的规则(Rule),代表希望寻找的模式。
在图形流中,我们需要首先使用颜色将流进行进行分区(keyBy),这能确保相同颜色的图形会流转到相同的物理机上。
// 将图形使用颜色进行划分
KeyedStream<Item, Color> colorPartitionedStream = itemStream.keyBy(new KeySelector<Item, Color>(){...});
对于规则流,它应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。下面这段代码会完成:
i) 将规则广播给所有下游 task;
ii) 使用 MapStateDescriptor 来描述并创建 broadcast state 在下游的存储结构
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {})
);
// 广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
最终,为了使用规则来筛选图形序列,我们需要:
为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),我们可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream,具有类型方法 process(),传入一个特殊的 CoProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:
在我们的例子中,图形流是一个 keyed stream,所以我们书写的代码如下:
connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// 模式匹配逻辑
}
);
在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中,我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。
两个子类型定义如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
需要注意的是 processBroadcastElement() 负责处理广播流的元素,而 processElement() 负责处理另一个流的元素。
两个方法的第二个参数(Context)不同,均有以下方法:
在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
这两个方法的区别在于对 broadcast state 的访问权限不同。在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。 这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。
processBroadcastElement() 的实现必须在所有的并发实例中具有确定性的结果。
同时,KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,所以它提供了一些 BroadcastProcessFunction 没有的功能:
1、processElement() 的参数 ReadOnlyContext 提供了方法能够访问 Flink 的定时器服务,可以注册事件定时器(event-time timer)或者处理时间的定时器(processing-time timer)。当定时器触发时,会调用 onTimer() 方法, 提供了 OnTimerContext,它具有 ReadOnlyContext 的全部功能,并且提供:
2、processBroadcastElement() 方法中的参数 Context 会提供方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。 这个方法使用一个 KeyedStateFunction 能够对 stateDescriptor 对应的 state 中所有 key 的存储状态进行某些操作。目前 PyFlink 不支持 apply_to_keyed_state。
注册一个定时器只能在 KeyedBroadcastProcessFunction 的 processElement() 方法中进行。 在 processBroadcastElement() 方法中不能注册定时器,因为广播的元素中并没有关联的 key。
回到我们当前的例子中,KeyedBroadcastProcessFunction 应该实现如下:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
// 我们用一个数组来存储,因为同时可能有很多第一个元素正在等待
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// 与之前的 ruleStateDescriptor 相同
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
这里有一些 broadcast state 的重要注意事项,在使用它时需要时刻清楚:
没有跨 task 通讯:如上所述,这就是为什么只有在 (Keyed)-BroadcastProcessFunction 中处理广播流元素的方法里可以更改 broadcast state 的内容。 同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。
broadcast state 在不同的 task 的事件顺序可能是不同的:虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。 所以 broadcast state 的更新不能依赖于流中元素到达的顺序。
所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。
不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。
本示例是上文中的内容具体实现,即实现:
1、按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。
2、相同颜色的规则1:长方形后是三角形
3、相同颜色的规则2:正方形后是长方形
如匹配上述规则1或规则2则输出匹配成功。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<!-- <scope>provided</scope> -->
</dependency>
</dependencies>
package org.tablesql.join;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
*
* @LastEditors: alanchan
*
* @Description: 按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。相同颜色的规则1:长方形后是三角形;规则2:正方形后是长方形
*/
public class TestJoinDimKeyedBroadcastProcessFunctionDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Shape {
private String name;
private String desc;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Colour {
private String name;
private Long blue;
private Long red;
private Long green;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Item {
private Shape shape;
private Colour color;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Rule {
private String name;
private Shape first;
private Shape second;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// item 实时流
DataStream<Item> itemStream = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
// 解析item流
// 数据结构:Item[shape(name,desc);color(name,blue,red,green)]
String[] lines = o.split(";");
String[] shapeString = lines[0].split(",");
String[] colorString = lines[1].split(",");
Shape shape = new Shape(shapeString[0],shapeString[1]);
Colour color = new Colour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));
return new Item(shape,color);
});
// rule 实时流
DataStream<Rule> ruleStream = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
// 解析rule流
// 数据结构:Rule[name;shape(name,desc);shape(name,desc)]
String[] lines = o.split(";");
String name = lines[0];
String[] firstShapeString = lines[1].split(",");
String[] secondShapeString = lines[2].split(",");
Shape firstShape = new Shape(firstShapeString[0],firstShapeString[1]);
Shape secondShape = new Shape(secondShapeString[0],secondShapeString[1]);
return new Rule(name,firstShape,secondShape);
}).setParallelism(1);
// 将图形使用颜色进行划分
KeyedStream<Item, Colour> colorPartitionedStream = itemStream
.keyBy(new KeySelector<Item, Colour>() {
@Override
public Colour getKey(Item value) throws Exception {
return value.getColor();// 实现分组
}
});
colorPartitionedStream.print("colorPartitionedStream:---->");
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {
}));
// 将rule定义为广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 连接,输出流,connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string
new KeyedBroadcastProcessFunction<Colour, Item, Rule, String>() {
// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
// 用一个数组来存储,因为同时可能有很多第一个元素正在等待
private final MapStateDescriptor<String, List<Item>> itemMapStateDesc = new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构
private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {
}));
// 负责处理广播流的元素
@Override
public void processBroadcastElement(Rule ruleValue,
KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.Context ctx,
Collector<String> out) throws Exception {
// 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
// 查询元素的时间戳:ctx.timestamp()
// 查询目前的Watermark:ctx.currentWatermark()
// 目前的处理时间(processing time):ctx.currentProcessingTime()
// 产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)
// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);
}
// 负责处理另一个流的元素
@Override
public void processElement(Item itemValue,
KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> itemMapState = getRuntimeContext().getMapState(itemMapStateDesc);
final Shape shape = itemValue.getShape();
System.out.println("shape:"+shape);
// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
ReadOnlyBroadcastState<String, Rule> readOnlyBroadcastState = ctx.getBroadcastState(ruleStateDescriptor);
Iterable<Entry<String, Rule>> iterableRule = readOnlyBroadcastState.immutableEntries();
for (Entry<String, Rule> entry : iterableRule) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
// 初始化
List<Item> itemStoredList = itemMapState.get(ruleName);
if (itemStoredList == null) {
itemStoredList = new ArrayList<>();
}
// 比较 shape
if (shape.getName().equals(rule.second.getName()) && !itemStoredList.isEmpty()) {
for (Item item : itemStoredList) {
// 符合规则,收集匹配结果
out.collect("匹配成功: " + item + " - " + itemValue);
}
itemStoredList.clear();
}
// 规则连续性设置
if (shape.getName().equals(rule.first.getName())) {
itemStoredList.add(itemValue);
}
//
if (itemStoredList.isEmpty()) {
itemMapState.remove(ruleName);
} else {
itemMapState.put(ruleName, itemStoredList);
}
}
}
});
output.print("output:------->");
env.execute();
}
}
在netcat中启动两个端口,分别是8888和9999,8888输入规则,9999输入item,然后关键控制台输出。
red;rectangle,is a rectangle;tripe,is a tripe
green;square,is a square;rectangle,is a rectangle
# 匹配成功
rectangle,is a rectangle;red,100,100,100
tripe,is a tripe;red,100,100,100
# 匹配成功
square,is square;green,150,150,150
rectangle,is a rectangle;green,150,150,150
# 匹配不成功
tripe,is tripe;blue,200,200,200
# 匹配成功
rectangle,is a rectangle;blue,100,100,100
tripe,is a tripe;blue,100,100,100
# 匹配不成功
tripe,is a tripe;blue,100,100,100
rectangle,is a rectangle;blue,100,100,100
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle)
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe)
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=square, desc=is square), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
colorPartitionedStream:---->:3> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=200, red=200, green=200))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
output:------->:1> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。
参照上文中的依赖。
实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
// 用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<Integer, User> broadcastDesc;
JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
this.broadcastDesc = broadcastDesc;
}
// 负责处理广播流的元素
@Override
public void processBroadcastElement(User value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
}
// 处理非广播流,关联维度
@Override
public void processElement(Order value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);
out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
}
}
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestJoinDimFromBroadcastDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流
DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
// "RulesBroadcastState",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint<Rule>() {
// }));
// 广播流,广播规则并且创建 broadcast state
// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 将user流(维表)定义为广播流
final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
Integer.class,
User.class);
BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);
// 需要由非广播流来进行调用
DataStream result = orderDs.connect(broadcastStream)
.process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
env.execute();
}
// final BroadcastProcessFunction<IN1, IN2, OUT> function)
// static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
// // 用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor<Integer, User> broadcastDesc;
// JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
// this.broadcastDesc = broadcastDesc;
// }
// // 负责处理广播流的元素
// @Override
// public void processBroadcastElement(User value,
// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
// Collector<Tuple2<Order, String>> out) throws Exception {
// System.out.println("收到广播数据:" + value);
// // 得到广播流的存储状态
// ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
// }
// // 处理非广播流,关联维度
// @Override
// public void processElement(Order value,
// BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
// Collector<Tuple2<Order, String>> out) throws Exception {
// // 得到广播流的存储状态
// ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);
// out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
// }
// }
}
本示例使用的是两个socket数据源,通过netcat进行模拟。
“192.168.10.42”, 8888
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
“192.168.10.42”, 9999
// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175
// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
以上,本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。