目录
1.1分为 按键分区(Keyed)和非按键分区(Non-Keyed)
3.1增量聚合函数( ReduceFunction / AggregateFunction )
3.2全窗口函数( full window functions )
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。
每个key上都定义了一组窗口,各自独立地进行统计计算。
需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...)
???????.window(...)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
没有keyby的窗口: 窗口内的所有数据进入同一个子任务,并行度值只能为1
stream.windowAll(...)
窗口操作主要有两个部分 : 窗口分配器(Window Assigners) 和 窗口函数(Window Functions)
stream.keyBy(<key selector>)
???????.window(<window assigner>)
???????.aggregate(<window function>)
定义窗口分配器,调用window()方法。
传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口
时间窗口又可以分为滚动、滑动和会话三种
stream.keyBy(...)
???????.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
???????.aggregate(...)
stream.keyBy(...)
???????.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
???????.aggregate(...)
stream.keyBy(...)
???????.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
???????.aggregate(...)
stream.keyBy(...)
???????.window(TumblingEventTimeWindows.of(Time.seconds(5)))
???????.aggregate(...)
stream.keyBy(...)
???????.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
???????.aggregate(...)
stream.keyBy(...)
???????.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
???????.aggregate(...)
基于全局窗口(Global Window)实现,直接调用countWindow()方法
传入一个长整型的参数表示窗口的大小
stream.keyBy(...)
???????.countWindow(10)
此处定义一个长度为10的滚动计数窗口,当窗口元素数量达到10时,触发计算执行并关闭窗口。
stream.keyBy(...)
???????.countWindow(10,3)
countWindow(窗口大小,滑动步长)
此处创建的窗口长度为10,步长为3,每个窗口统计10个数据,每隔3个输出一次结果
全局窗口一般在自定义窗口时使用
stream.keyBy(...)
???????.window(GlobalWindows.create());
GlobalWindows.create() 使用全局窗口,必须自定义触发器才能实现窗口计算。
????????窗口后续的计算操作就是窗口函数
????????根据处理方式分为增量聚合函数和全窗口函数
????????将收集起来的数据进行聚合
????????ReduceFunction? 和? AggregateFunction
ReduceFunction必须输入输出的类型一样
public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } // 示例: 获取一段时间内每个用户浏览的商品的最大价值的那条记录(ReduceFunction) kafkaStream // 将从Kafka获取的JSON数据解析成Java Bean .process(new KafkaProcessFunction()) // 提取时间戳生成水印 .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) // 按用户分组 .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) // 构造TimeWindow .timeWindow(Time.seconds(windowLengthSeconds)) // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录 .reduce(new ReduceFunction<UserActionLog>() { @Override public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception { return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2; } }) .print();
# 结果
UserActionLog{userID='user_4', eventTime='2019-11-09 12:51:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_2', eventTime='2019-11-09 12:51:29', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-09 12:51:22', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_5', eventTime='2019-11-09 12:51:21', eventType='browse', productID='product_3', productPrice=30}
AggregateFunction 不需要输入输出类型一样 , 需要传入一个AggregateFunction的实现类
接口中有四个方法:
首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。
? ? ? ? 全窗口函数有两种:WindowFunction 和 ProcessWindowFunction
通过WindowFunction调用 apply ( ) 方法,传入一个WindowFunction的实现类
stream
????.keyBy(<key selector>)
????.window(<window assigner>)
????.apply(new MyWindowFunction());
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
控制窗口什么时候触发计算。
通过 WindowedStream 调用 trigger ( ) 方法 传入一个自定义的窗口触发器
stream.keyBy(...)
???????.window(...)
???????.trigger(new MyTrigger())
定义移除某些数据的逻辑
通过 WindowedStream 调用 evictor ( ) 方法 传入自定义的移除器
stream.keyBy(...)
???????.window(...)
???????.evictor(new MyEvictor())