之前的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。
在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。
转换算子一般针对某种具体操作来定义的,能拿到的信息有限。而使用底层的处理函数,则可以使用处理函数提供的“定时服务”(TimerServer) 来获取到当前当前水位线、事件等更为详细的信息,及注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。
ProcessFunction的使用就基于 DataStream 调用 .process() 方法传入一个ProcessFunction作为参数,用来定义处理逻辑。
从源码可以看到,ProcessFunction 继承了?AbstractRichFunction(抽象富函数类),两个泛类型参数代表输入类型与输出类型。里面单独定于了两个非常重要的方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。
DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用 .KeyBy() 后得到的是KeyedStream,然后调用 .window() 后得到的 WindowedStream。但是对于不同的流类型,都可以直接调用 .process() 方法进行自定义处理,此时传入的参数就叫做处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。
Flink提供了8个不同的处理函数:
(1) ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。?
(2) KeyedProcessFunction
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3) ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
(4) ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
(5) CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。
(6) ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
(7) BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
(8) KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。
上面提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以我们一般是先对流做 KeyBy 分区操作后,再去调用 .process() 定义具体的操作逻辑逻辑;一般传入 KeyedProcessFunction。
在.onTimer()方法中可以实现定时处理的逻辑,当之前注册的定时器被触发,则会调用该方法。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。
通过KeyedProcessFunction提供的上下文可以获取以下等内容:
ds
.keyBy( t -> t.getId())
.process(new KeyedProcessFunction<String, WaterSensor, Object>() {
/**
* 来一条数据调用一次
* @param value 当前输入的数据
* @param ctx 上下文信息
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
// 获取定时服务
TimerService timerService = ctx.timerService();
// 注册定时器:以事件时间为基准
timerService.registerEventTimeTimer(long time);
// 注册定时器:以处理时间为基准
timerService.registerProcessingTimeTimer(long time);
// 当前的处理时间:即系统时间
timerService.currentProcessingTime();
// 删除触发时间为time的处事件时间定时器
timerService.deleteEventTimeTimer(long time);
// 删除触发时间为time的处理时间定时器
timerService.deleteEventTimeTimer(long time);
// 获取当前水位线 ***获取的上一条数据的水位线
timerService.currentWatermark();
}
/**
* 时间进展到定时器注册的时间则会调用该方法
* @param timestamp 当前时间戳
* @param ctx 上下文信息
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {
}
});
TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("xxx.xxx.xxx.xxx", 1234)
.map(new MyMapFunctionImpl());
// ***定义 WaterMark 策略
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);
// ***指定 watermark策略
SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
sensorWithWaterMark
.keyBy( t -> t.getId())
.process(new KeyedProcessFunction<String, WaterSensor, Object>() {
/**
* 来一条数据调用一次
* @param value 当前输入的数据
* @param ctx 上下文信息
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
// 获取定时服务
TimerService timerService = ctx.timerService();
// 当前 Key
String currentKey = ctx.getCurrentKey();
// 数据中的事件时间
Long timestamp = ctx.timestamp();
// 注册以事件时间为基准的5s定时器
timerService.registerEventTimeTimer(5000);
System.out.println("当前 Key 为=" + currentKey + "当前时间为=" + timestamp + "注册了一个5s的定时器");
}
/**
* 时间进展到定时器注册的时间则会调用该方法
* @param timestamp 当前时间戳
* @param ctx 上下文信息
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {
// 当前 Key
String currentKey = ctx.getCurrentKey();
System.out.println("当前 Key 为=" + currentKey + "现在时间为" + timestamp + "定时器触发");
}
});
env.execute();
}
?输入:
[root@VM-55-24-centos ~]# nc -lk 1234
S1,1,1
S1,4,4
S2,3,3
S2,5,5
S3,8,8
S3,9,9
输出:
当前 Key 为=S1当前时间为=1000注册了一个5s的定时器
当前 Key 为=S1当前时间为=4000注册了一个5s的定时器
当前 Key 为=S2当前时间为=3000注册了一个5s的定时器
当前 Key 为=S2当前时间为=5000注册了一个5s的定时器
当前 Key 为=S3当前时间为=8000注册了一个5s的定时器
当前 Key 为=S3当前时间为=9000注册了一个5s的定时器 // 触发定时器 9000ms-3000ms-1ms = 5999
当前 Key 为=S1现在时间为5000定时器触发
当前 Key 为=S3现在时间为5000定时器触发
当前 Key 为=S2现在时间为5000定时器触发
在实现的 processElement() 中获取当前水位线
sensorWithWaterMark
.keyBy( t -> t.getId())
.process(new KeyedProcessFunction<String, WaterSensor, Object>() {
/**
* 来一条数据调用一次
* @param value 当前输入的数据
* @param ctx 上下文信息
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
// 获取定时服务
TimerService timerServiceService = ctx.timerService();
// 获取当前水位线
long currentWatermark = timerServiceService.currentWatermark();
System.out.println("当前数据为=" + value + "当前水位线为=" + currentWatermark );
}
});
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,3,3
s1,6,6
s1,8,8
输出:
当前数据为=WaterSensor{id='s1', ts=1, vc=1}当前水位线为=-9223372036854775808
当前数据为=WaterSensor{id='s1', ts=3, vc=3}当前水位线为=-2001
当前数据为=WaterSensor{id='s1', ts=6, vc=6}当前水位线为=-1
当前数据为=WaterSensor{id='s1', ts=8, vc=8}当前水位线为=2999
可以看到,在process中的当前水位线其实是 上一条数据的事件时间?- 水位线延迟时间 - 1ms。
- 定时服务(TimerServer)只有 KeyedStream(键控流)?才能使用
- 事件时间定时器,是通过 Watermark 触发的
- Watermark = 当前最大事件时间 - Watermark 延迟时间 - 1ms
- 在 process 中获取到的 Watermark 其实是上一条数据的?Watermark
其他处理函数类似。
案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。?
思路:直接开窗,使用全窗口函数处理窗口内所有的数据,使用HashMap存储,再对map进行统计排序输出。统计十秒内数据,五秒输出一次,其实就是滑动窗口大小为10,滑动步长为5。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("xxx.xxx.xxx.xxx", 1234)
.map(new MyMapFunctionImpl());
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);
SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
sensorWithWaterMark
// 滑动窗口,窗口大小10s,滑动步长5s
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.process(new MyProcessAllWindowFunction())
.print();
env.execute();
}
/**
* 自定义全窗口处理函数
* 全窗口函数:窗口触发时调用一次
*/
public static class MyProcessAllWindowFunction extends ProcessAllWindowFunction<WaterSensor , String , TimeWindow>{
@Override
public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// 定义一个hashmap用来存,key=vc,value=count值
Map<Integer, Integer> vcCountMap = new HashMap<>();
for (WaterSensor element : elements) {
Integer vc = element.getVc();
if (vcCountMap.containsKey(vc)) {
vcCountMap.put(vc, vcCountMap.get(vc) + 1);
} else {
vcCountMap.put(vc, 1);
}
}
List<Map.Entry<Integer, Integer>> list = new ArrayList<>(vcCountMap.entrySet());
// 使用Collections.sort()按value降序排序
Collections.sort(list, (o1, o2) -> o2.getValue() - o1.getValue());
System.out.println(list);
StringBuffer result = new StringBuffer();
for (int i = 0; i < Math.min(2 , list.size()); i++) {
result.append("Top " + (i+1) + ": vc = " + list.get(i).getKey() + ",出现次数 = " + list.get(i).getValue());
result.append("\n");
}
out.collect(result.toString());
}
}
}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s2,3,1
s1,5,2
s3,6,2
s1,6,1
s2,7,3
s3,8,1
s1,8,3
s2,9,2
s1,10,1
s3,11,2
s1,13,2
输出:
// 注释:[ 0 , 5 ] 的窗口
Top 1: vc = 1,出现次数 = 2
// 注释:[ 0 , 10 ] 的窗口
Top 1: vc = 1,出现次数 = 4
Top 2: vc = 2,出现次数 = 3
上面的方法使用全窗口将所有的数据都放在一个分区内,强行将并行度设置成了1,这是Flink不推荐的做法。
则可以使用KeyedProcessFunction进行优化:
1.对统计字段(vc)进行 KeyBy 分区
2.进行增量聚合,统计vc出现的次数,封装数据(vc,count,窗口标记(窗口结束数据))
3.对标记(窗口)进行分组,对数据进行排序、取TopN
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("xxx.xxx.xxx.xxx", 1234)
.map(new MyMapFunctionImpl())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
);
// 对 vc 进行分组,统计窗口内vc出现的次数 将每条数据封装成 vc,count,窗口结束时间
SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS
.keyBy(sensor -> sensor.getVc())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(
new VcCountAgg(),
new WindowResult()
);
// 按照打的标记(windowEndTime)进行分组,保证同一个窗口时间范围的结果一起 , 分组后排序,取 TopN
windowAgg.keyBy(v -> v.f2)
.process(new TopN(2))
.print();
env.execute();
}
/**
* 增量聚合:累计同分组出现的次数
*/
public static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(WaterSensor waterSensor, Integer integer) {
return integer+1;
}
@Override
public Integer getResult(Integer integer) {
return integer;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
}
/**
* 全窗口函数,窗口内数据全部到达才会执行一次
* 泛型如下:
* 第一个:输入类型 = 增量函数的输出 count值,Integer
* 第二个:输出类型 = Tuple3(vc,count,windowEndTime) ,带上 窗口结束时间 的标签
* 第三个:key类型 , vc,Integer
* 第四个:窗口类型
*/
public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {
@Override
public void process(Integer vc, ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
// 获取迭代器的数据(只有一条数据)
Integer count = elements.iterator().next();
// 获取窗口结束时间 作为标记
long endTime = context.window().getEnd();
// 将 vc、vc对应的count 连带窗口标记 返回
out.collect(Tuple3.of(vc , count , endTime));
}
}
/**
* 处理组内的每一条数据 一条数据触发一次
*/
public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {
// 存不同窗口的 统计结果,key=windowEnd,value=list数据
private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;
// 要取的Top数量
private int threshold;
public TopN(int threshold) {
this.threshold = threshold;
dataListMap = new HashMap<>();
}
@Override
public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
// 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存
Long windowEnd = value.f2;
// 将对应的窗口的数据放入map中对应key的list中
if(dataListMap.containsKey(windowEnd)){
// 该 vc 存在,则直接添加到数组中
dataListMap.get(windowEnd).add(value);
}else{
// 不包含vc,是该vc的第一条
List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
dataList.add(value);
dataListMap.put(windowEnd , dataList);
}
// 注册一个定时器 窗口触发时触发定时器(windowEndTime + 1ms) 输出计算结果
// 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可
ctx.timerService().registerProcessingTimeTimer(windowEnd + 1);
}
// 注册一个定时器 窗口触发时进行排序,取TopN,输出结果
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopN
Long windowEnd = ctx.getCurrentKey();
// 1. 排序
List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
dataList.sort((o1, o2) -> o2.f1 - o1.f1);
// 2. 取TopN
StringBuilder outStr = new StringBuilder();
outStr.append("================================\n");
// 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况 ==》 List中元素的个数 和 2 取最小值
for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
outStr.append("Top" + (i + 1) + "\n");
outStr.append("vc=" + vcCount.f0 + "\n");
outStr.append("count=" + vcCount.f1 + "\n");
outStr.append("窗口结束时间=" + vcCount.f2 + "\n");
outStr.append("================================\n");
}
out.collect(outStr.toString());
}
}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,1
s2,3,3
s1,4,3
s1,5,1
s2,8,1
s1,9,3
s3,11,3
s1,12,3
s1,13,3
输出:
================================
Top1
vc=1
count=2
窗口结束时间=5000
================================
Top2
vc=3
count=2
窗口结束时间=5000
================================
================================
Top1
vc=1
count=4
窗口结束时间=10000
================================
Top2
vc=3
count=3
窗口结束时间=10000
================================
侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。
具体实现上,可以在处理函数中的上下文中调用 .output() 方法即可。
案例:对每个传感器,水位超过10则输出告警信息。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("xxx.xxx.xxx.xxx", 1234)
.map(new MyMapFunctionImpl())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
);
// 定义输出流标签
OutputTag<String> warnTag = new OutputTag<String>("warn-tag", Types.STRING);
SingleOutputStreamOperator<WaterSensor> process = sensorDS
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
if (value.getVc() >= 10) {
// 使用侧输出流告警
ctx.output(warnTag, "当前水位线:" + value.getVc() + ",触发阈值10!");
}
out.collect(value);
}
});
// 输出主流
process.print("主流");
// 输出侧流
process.getSideOutput(warnTag).printToErr("侧流-Warn");
env.execute();
}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,3
s2,5,7
s1,9,10
s1,5,9
s5,11,11
输出:
?
主流> WaterSensor{id='s1', ts=1, vc=3}
主流> WaterSensor{id='s2', ts=5, vc=7}
侧流-Warn> 当前水位线:10,触发阈值10!
主流> WaterSensor{id='s1', ts=9, vc=10}
主流> WaterSensor{id='s1', ts=5, vc=9}
侧流-Warn> 当前水位线:11,触发阈值10!
主流> WaterSensor{id='s5', ts=11, vc=11}