窗口处理函数包括:ProcessWindowFunction 和 ProcessAllWindowFunction
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
这里的MyProcessWindowFunction
就是ProcessWindowFunction
的一个实现类;
ProcessWindowFunction
是一个典型的全窗口函数,把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* Deletes any state in the {@code Context} when the Window expires (the watermark passes its
* {@code maxTimestamp} + {@code allowedLateness}).
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public void clear(Context context) throws Exception {}
/** The context holding window metadata. */
public abstract class Context implements java.io.Serializable {
/** Returns the window that is being evaluated. */
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up by
* implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/** State accessor for per-key global state. */
public abstract KeyedStateStore globalState();
/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
类型参数如下:
TimeWindow
定义方法如下:
process
(窗口处理函数不是逐个处理数据)
可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context 所包含的内容也跟其他处理函数有所差别:
①不再提供设置定时器的方法
②由于当前不是只处理一个数据,所以也不再提供
.timestamp()
方法③可以通过
.window()
直接获取到当前的窗口对象④可以通过
.windowState()
和.globalState()
获取到当前自定义的窗口状态和全局状态
clear()
:
进行窗口的清理工作:如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出