处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。
Flink提供了8种不同处理函数:
ProcessFunction
:dataStreamKeyedProcessFunction
:用于KeyedStream,keyBy之后的流处理CoProcessFunction
:用于connect连接的流ProcessJoinFunction
:用于join流操作BroadcastProcessFunction
:用于广播KeyedBroadcastProcessFunction
:keyBy之后的广播ProcessWindowFunction
:窗口增量聚合ProcessAllWindowFunction
:全窗口聚合@Public
public interface RichFunction extends Function {
void open(Configuration var1) throws Exception;
void close() throws Exception;
RuntimeContext getRuntimeContext();
IterationRuntimeContext getIterationRuntimeContext();
void setRuntimeContext(RuntimeContext var1);
}
处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。它所对应的函数 类,就叫作 ProcessFunction
。
在很多应 用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。就必须使用处理函数进行实现。
处理函数提供了一个“定时服务” (TimerService
),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线 (watermark),甚至可以注册“定时事件”。处理函数继承了 AbstractRichFunction
抽象类, 所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函 数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方 法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()
方法 就可以了。ProcessFunction
不是接口,而是一个抽象类,继承了 AbstractRichFunction
; 所以所有的处理函数,都是富函数(RichFunction), 富函数可以调用的东西这里同样都可以调用。
PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessFunction() {
}
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
//
public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
//
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
processElement
会针对流中的每条记录调用一次。跟MapFunction
一样,Collector
发送出去。
Context
可以访问时间戳,当前记录键值以及TimeService
,支持将结果发送到副输出。
onTimer()
是一个回调函数,会在之前注册的计数器触发时调用。timestamp
参数给出了所触发计时器的时间戳,Collector
可用来发出记录。
OnTimerContext
能够提供和processElement()
方法中Context
对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。
Context
和OnTimerContext
对象中TimerService
提供时间相关的数据访问。
PublicEvolving
public interface TimerService {
/** Error string for {@link UnsupportedOperationException} on registering timers. */
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
/** Error string for {@link UnsupportedOperationException} on deleting timers. */
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
/** 返回当前的处理时间。 */
long currentProcessingTime();
/** 返回当前水位线时间戳。 */
long currentWatermark();
/**
针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。
*/
void registerProcessingTimeTimer(long time);
/**
* 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。
*/
void registerEventTimeTimer(long time);
/**
* 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。
*/
void deleteProcessingTimeTimer(long time);
/**
* 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。
*/
void deleteEventTimeTimer(long time);
}
计时器触发时会调用
onTimer()
回调函数,系统对于processElement()
和onTimer()
两个方法调用同步,防止并发。每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。
大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有split
算子可以将一条流拆分成多条类型相同的流。
处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。
按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。
PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
public abstract class Context {
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
/** Get key of the element being processed. */
public abstract K getCurrentKey();
}
public abstract class OnTimerContext extends Context {
/** The {@link TimeDomain} of the firing timer. */
public abstract TimeDomain timeDomain();
/** Get key of the firing timer. */
@Override
public abstract K getCurrentKey();
}
}
除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction
),另一种是全窗口处理函数(ProcessAllWindowsFunction
),ProcessWindowFunction
获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。
ProcessWindowsFunction
:处理分区数据,每个窗口执行一次process
方法.
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
/** The context holding window metadata. */
public abstract class Context implements java.io.Serializable {
/** Returns the window that is being evaluated. */
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
ProcessAllWindowFunction
和ProcessFunction
类相似,都是用来对上游过来的元素做处理,不过ProcessFunction
是每个元素执行一次processElement
方法,ProcessAllWindowFunction
是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);
public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window>
extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out)
throws Exception;
public void clear(Context context) throws Exception {}
/** The context holding window metadata. */
public abstract class Context {
public abstract W window();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
对于连接流ConnectedStreams
的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function
)。与 CoMapFunction
类似,如果是调用.flatMap()
就需要传入一个 CoFlatMapFunction
,需要实现 flatMap1()
、flatMap2()
两个方法;而调用.process()
时,传入的则是一个 CoProcessFunction
。
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out)
throws Exception;
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
public abstract class Context {
public abstract Long timestamp();
/** A {@link TimerService} for querying time and registering timers. */
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
public abstract class OnTimerContext extends Context {
public abstract TimeDomain timeDomain();
}
}
ProcessJoinFunction
和CoProcessFunction
类似,但是有区别。
ProcessJoinFunction
用于join流操作,可以拿到两个流数据处理
CoProcessFunction
用于连接流处理,两个流数据分别处理
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = -2444626938039012398L;
public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out)
throws Exception;
public abstract class Context {
public abstract long getLeftTimestamp();
public abstract long getRightTimestamp();
public abstract long getTimestamp();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
广播连接流处理函数,基于 BroadcastConnectedStream
调用.process()
时作为参数传入。这里的“广播连接流” BroadcastConnectedStream
,是一个未 keyBy
的普通 DataStream 与一个广播流(BroadcastStream
)做连接(conncet
)之后的产物。
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = 8352559162119034453L;
public abstract void processElement(
final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(
final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
public abstract class Context extends BaseBroadcastProcessFunction.Context {}
public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream
调用.process()
时作为参数传入。与 BroadcastProcessFunction
不同的是,这时的广播连接流, 是一个 KeyedStream
与广播流(BroadcastStream
)做连接之后的产物。