Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

发布时间:2024年01月14日

学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》

学习笔记如下:


窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。

Keyed Windows

在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(...)

stream
       .keyBy(...)               //  仅 keyed 窗口需要
       .window(...)              //  必填项:"assigner"
      [.trigger(...)]            //  可选项:"trigger" (省略则使用默认 trigger)
      [.evictor(...)]            //  可选项:"evictor" (省略则不使用 evictor)
      [.allowedLateness(...)]    //  可选项:"lateness" (省略则为 0)
      [.sideOutputLateData(...)] //  可选项:"output tag" (省略则不对迟到数据使用 side output)
       .reduce/aggregate/apply() //  必填项:"function"
      [.getSideOutput(...)]      //  可选项:"output tag"

在 non-keyed windows 上使用窗口时,直接调用 windowAll(...)

stream
       .windowAll(...)           //  必填项:"assigner"
      [.trigger(...)]            //  可选项:"trigger" (else default trigger)
      [.evictor(...)]            //  可选项:"evictor" (else no evictor)
      [.allowedLateness(...)]    //  可选项:"lateness" (else zero)
      [.sideOutputLateData(...)] //  可选项:"output tag" (else no side output for late data)
       .reduce/aggregate/apply() //  必填项:"function"
      [.getSideOutput(...)]      //  可选项:"output tag"

窗口的生命周期

窗口会在第一个属于它的元素到达时被创建,然后在时间超过窗口的 “结束时间戳 + 允许的延迟时间” 时被完全删除。Flink 仅保证删除基于时间的窗口。

例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么当第一个元素落入 12:0012:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。

  • window trigger:定义何时窗口中的数据可以被 function 计算;在窗口被创建后、删除前的这段时间内定义如何清理窗口中的数据。
  • window function:定义如何计算窗口中的内容。
  • window evictor:在 trigger 触发之后,在窗口函数的前后删除数据。

Keyed 和 Non-Keyed Windows

首先必须要在定义窗口前确定数据流是 keyed 还是 non-keyed。

使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理,属于同一个 key 的元素会被发送到同一个 task。

对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream,所以所有的窗口计算会被同一个 task 完成,也就是并行度为 1。

Window Assigners

指定了你的 stream 是否为 keyed 之后,下一步就是定义 window assigner。

Window assigner 定义了 stream 中的元素如何被分发到各个窗口。 可以在 window(...)windowAll(...) 中指定一个 WindowAssigner

基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。 在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow, 它有查询开始和结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()

滚动窗口(Tumbling Windows)

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

例如:指定滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。

在这里插入图片描述

示例:使用滚动窗口的样例

// 滚动 event-time 窗口
input
 .keyBy(<key selector>)
 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
 .<windowed transformation>(<window function>);
// 滚动 processing-time 窗口
input
 .keyBy(<key selector>)
 .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
 .<windowed transformation>(<window function>);
// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input
 .keyBy(<key selector>)
 .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
 .<windowed transformation>(<window function>);

滚动窗口有两个参数:

  • 第一个是窗口长度(即两个窗口之间的时间间隔),其中的时间间隔可以用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来指定。
  • 第二个是窗口的偏移量(offset)。在不设置偏移量时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999 等。而如果设置了 15 分钟的偏移量,则会得到 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999 等。
滑动窗口(Sliding Windows)

滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 但是,滑动窗口还可以指定滑动距离(window slide)参数来控制生成新窗口的频率。 如果滑动距离(slide)小于窗口大小,则滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

例如:滑动窗口指定大小为 10 分钟、滑动距离 5 分钟的窗口,则每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

示例:使用滑动窗口

// 滑动 event-time 窗口
input
 .keyBy(<key selector>)
 .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
 .<windowed transformation>(<window function>);
// 滑动 processing-time 窗口
input
 .keyBy(<key selector>)
 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
 .<windowed transformation>(<window function>);
// 滑动 processing-time 窗口,偏移量为 -8 小时
input
 .keyBy(<key selector>)
 .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
 .<windowed transformation>(<window function>);

滑动窗口有 3 个参数:

  • 第 1 个是窗口长度,指定时间间隔的方法与滚动窗口一致
  • 第 2 个是滑动距离
  • 第 3 个是偏移量(offset),设置方法与滚动窗口一致
会话窗口(Session Windows)

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据(即在一段不活跃的间隔)之后会关闭。

会话窗口的 assigner 可以设置固定的会话间隔(session gap)或用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

示例:使用会话窗口

// 设置了固定间隔的 event-time 会话窗口
input
 .keyBy(<key selector>)
 .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
 .<windowed transformation>(<window function>);
// 设置了动态间隔的 event-time 会话窗口
input
 .keyBy(<key selector>)
 .window(EventTimeSessionWindows.withDynamicGap((element) -> {
     // 决定并返回会话间隔
 }))
 .<windowed transformation>(<window function>);
// 设置了固定间隔的 processing-time session 窗口
input
 .keyBy(<key selector>)
 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
 .<windowed transformation>(<window function>);
// 设置了动态间隔的 processing-time 会话窗口
input
 .keyBy(<key selector>)
 .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
     // 决定并返回会话间隔
 }))
 .<windowed transformation>(<window function>);

会话窗口有 2 种参数:

  • 固定间隔:指定时间间隔的方法与滚动窗口一致
  • 动态间隔:通过实现 SessionWindowTimeGapExtractor 接口来指定。

会话窗口并没有固定的开始或结束时间,在 Flink 内部,会话窗口的算子会为每一条数据创建一个窗口,然后将距离不超过预设间隔的窗口合并。

全局窗口(Global Windows)

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在指定了自定义的 trigger 时有效,否则计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

在这里插入图片描述

示例:使用全局窗口

DataStream<T> input = ...;

input
 .keyBy(<key selector>)
 .window(GlobalWindows.create())
 .<windowed transformation>(<window function>);

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了。关于窗口如何触发,详见 triggers

窗口函数有三种:ReduceFunctionAggregateFunctionProcessWindowFunction。 前两者执行起来更高效(详见 State Size)因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用 ProcessWindowFunction 的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。 ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 合并来提高效率。 这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。 我们接下来看看每种函数的例子。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

示例:使用 ReduceFunction 对元组的第二个属性求和

DataStream<Tuple2<String, Long>> input = ...;

input
 .keyBy(<key selector>)
 .window(<window assigner>)
 .reduce(new ReduceFunction<Tuple2<String, Long>>() {
   public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
     return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
   }
 });
AggregateFunction

AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT),其中输入数据的类型是输入流的元素类型。

AggregateFunction 接口有如下方法:

  • createAccumulator():创建初始累加器
  • add:将一条元素累加进累加器
  • merge:合并两个累加器
  • getResult:从累加器中提取输出(OUT 类型)

可以看到,ReduceFunctionAggregateFunction 的特殊情况。与 ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

示例:使用 AggregateFunction 计算窗口中所有元素的元组的第二个属性的平均值

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
ProcessWindowFunction

ProcessWindowFunction 可以获取包含窗口内所有元素的 Iterable,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。但是,ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的,因为窗口中的数据无法被增量聚合,所以需要在窗口触发前缓存所有数据。

ProcessWindowFunction 抽象类的源码如下:

// flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

    /**
     * Deletes any state in the {@code Context} when the Window expires (the watermark passes its
     * {@code maxTimestamp} + {@code allowedLateness}).
     *
     * @param context The context to which the window is being evaluated
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public void clear(Context context) throws Exception {}

    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();

        /** Returns the current processing time. */
        public abstract long currentProcessingTime();

        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();

        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();

        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }
}

key 参数由 keyBy() 中指定的 KeySelector 选出。

示例:使用 ProcessWindowFunction 对窗口中的元素技术,并且将窗口本身的信息一同输出

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
增量聚合的 ProcessWindowFunction

通过将 ProcessWindowFunctionReduceFunctionAggregateFunction 搭配使用,可以将数据在到达窗口的时候进行增量聚合,当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。这样就可实现增量聚合窗口,同时从 ProcessWindowFunction 中获得窗口的结果数据。

示例:使用 ReduceFunction 增量聚合。将 ReduceFunctionProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

示例:使用 AggregateFunction 增量聚合。将 AggregateFunctionProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state,ProcessWindowFunction 还可以使用作用域仅为 “当前正在处理的窗口” 的 keyed state。不同的窗口、不同的 key 都会有自己不同的 per-window state。

process() 接收到的 Context 对象中,有两个方法允许我们访问以下两种 state:

  • globalState(),访问全局的 keyed state
  • windowState(),访问作用域仅限于当前窗口的 keyed state

这尤其适用于一个 window 被触发多次的情况(例如出现延迟数据再次触发窗口计算,或自定义了提前触发窗口的 trigger)。

当使用窗口状态时,需要注意在删除窗口时清除这些状态,具体地,它们应该定义在 clear() 方法中。

WindowFunction(已过时)

在某些可以使用 ProcessWindowFunction 的地方,你也可以使用 WindowFunction。 它是旧版的 ProcessWindowFunction,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。 这个接口会在未来被弃用。

WindowFunction 的源码如下:

// flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

示例:使用 WindowFunction

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

Triggers

Trigger 决定了一个窗口何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,则可以在 trigger(...) 调用中指定自定义的 trigger。

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement():在每个元素被加入窗口时调用
  • onEventTime():在注册的 event-time timer 触发时调用
  • onProcessingTime():在注册的 processing-time timer 触发时调用
  • onMerge():与有状态的 trigger 相关。该方法会在两个窗口合并时,将窗口对应 trigger 的状态进行合并(例如使用会话窗口时)。
  • clear():在对应窗口被移除时调用。

前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:

  • CONTINUE:什么也不做
  • FIRE:触发计算
  • PURGE:清空窗口内的元素
  • FIRE_AND_PURGE:触发计算,计算结束后清空窗口内的元素
触发(Fire)与清除(Purge)

当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIREFIRE_AND_PURGE。 这是让窗口算子发送当前窗口计算结果的信号。如果一个窗口指定了 ProcessWindowFunction,所有的元素都会传给 ProcessWindowFunction。 如果是 ReduceFunctionAggregateFunction,则直接发送聚合的结果。

当 trigger 触发时,它可以返回 FIREFIRE_AND_PURGEFIRE 会保留被触发的窗口中的内容,而 FIRE_AND_PURGE 会删除这些内容。 Flink 内置的 trigger 默认使用 FIRE,不会清除窗口的状态。

WindowAssigner 默认的 Triggers

WindowAssigner 默认的 Trigger 足以应付诸多情况。 例如,所有的 event-time window assigner 都默认使用 EventTimeTrigger。 这个 trigger 会在 watermark 越过窗口结束时间后直接触发。

GlobalWindow 的默认 trigger 是永远不会触发的 NeverTrigger。因此,使用 GlobalWindow 时,必须自己定义一个 trigger。

内置 Triggers 和自定义 Triggers

Flink 包含一些内置 trigger:

  • EventTimeTrigger:根据 watermark 测量的 event time 触发
  • ProcessingTimeTrigger:根据 processing time 触发
  • CountTrigger:在窗口中的元素超过预设的限制时触发
  • PurgingTrigger:接收另一个 trigger 并将它转换成一个会清理数据的 trigger

如果你需要实现自定义的 trigger,可以考虑继承抽象类 Trigger。

Trigger 抽象类的源码位置:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

Evictors

Flink 的窗口模型允许在 WindowAssignerTrigger 之外指定可选的 Evictor。 如本文开篇的代码中所示,通过 evictor(...) 方法传入 Evictor。 Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 Evictor 接口提供了两个方法实现此功能:

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。

Flink 有三个内置的 evictor:

  • CountEvictor:仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除
  • DeltaEvictor:接收 DeltaFunctionthreshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。
  • TimeEvictor:接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

Flink 对窗口中元素的顺序不做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。

Allowed Lateness

在默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素,依旧会被加入窗口。一个迟到但没有被丢弃的元素是否会再次触发窗口,取决于窗口的 trigger,比如 EventTimeTrigger

为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除。

示例:指定 allowed lateness

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

当使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE

从旁路输出(side output)获取迟到数据

可以通过 Flink 的旁路输出功能,获得迟到数据的数据流。

示例:获取延迟数据并添加到 lateStream 数据流

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
迟到数据的一些考虑

当指定了大于 0 的 allowed lateness 时,窗口本身以及其中的内容仍会在 watermark 越过窗口末端后保留。这时,如果一个迟到但未被丢弃的数据到达,它可能会再次触发这个窗口。这种触发被称作 late firing,与表示第一次触发窗口的 main firing 相区别。

如果是使用会话窗口的情况,late firing 可能会进一步合并已有的窗口,因为他们可能会连接现有的、未被合并的窗口。

需要注意的是,late firing 发出的元素应该被视作对之前计算结果的更新,即你的数据流中会包含一个相同计算任务的 多个结果。你的应用需要考虑到这些重复的结果,或去除重复的部分。

Working with window results

窗口操作的结果会变回 DataStream,并且窗口操作的信息不会保存在输出的元素中。所以如果想要保留窗口的 meta-information,则需要在 ProcessWindowFunction 里手动将他们放入输出的元素中。输出元素中保留的唯一相关的信息是元素的 timestamp。 它被设置为窗口能允许的最大 timestamp,也就是 end timestamp - 1。也就是说,在窗口操作之后,元素总是会携带一个 event-time 或 processing-time timestamp。 对 Processing-time 窗口来说,这并不意味着什么。 而对于 event-time 窗口来说,“输出携带 timestamp” 以及 “watermark 与窗口的相互作用” 这两者使建立窗口大小相同的连续窗口操作(consecutive windowed operations) 变为可能。我们先看看 watermark 与窗口的相互作用,然后再来讨论它。

watermarks 与窗口的交互

当 watermark 到达窗口算子时,它触发了两件事:

  • 这个 watermark 触发了所有最大 timestamp(即 end-timestamp - 1)小于它的窗口
  • 这个 watermark 被原封不动地转发给下游的任务。

通俗来讲,watermark 将当前算子中那些 “一旦这个 watermark 被下游任务接收就肯定会就超时” 的窗口全部冲走。

连续窗口操作

通过窗口结果元素的 timestamp 以及窗口与 watermark 的交互,使串联多个窗口操作成为可能。只要指定相同的窗口时长和偏移量,就可以实现有两个连续的窗口,它们既能使用不同的 key,又能让上游操作中某个窗口的数据出现在下游操作的相同窗口。

示例:连续窗口操作。其中第一个时间窗口中在 [0, 5) 中的结果会出现在下一个窗口的 [0, 5) 窗口中。

DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());

关于状态大小的考量

窗口可以被定义在很长的时间段上(比如几天、几周或几个月)并且积累下很大的状态。当你估算窗口计算的储存需求时,可以使用几条规则:

  1. Flink 会为一个元素在它所属的每一个窗口中都创建一个副本。 因此,一个元素在滚动窗口的设置中只会存在一个副本(一个元素仅属于一个窗口,除非它迟到了),而在在滑动窗口中可能会被多次拷贝。
  2. ReduceFunctionAggregateFunction 可以极大地减少储存需求,因为他们会就地聚合到达的元素,且每个窗口仅储存一个值;而使用 ProcessWindowFunction 需要累积窗口中所有的元素。
  3. 使用 Evictor 可以避免预聚合,因为窗口中的所有数据必须先经过 evictor 才能进行计算。
文章来源:https://blog.csdn.net/Changxing_J/article/details/135562227
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。