II.Flink中的 状态

发布时间:2024年01月23日

目录

按键分区状态(Keyed State)

1.1值状态(ValueState)

1.2列表状态(ListState)

1.3归约状态(ReducingState)

1.5聚合状态(AggregatingState)

1.5Map状态(MapState)

算子状态(Operator State)

CheckpointedFunction


这篇文章是详细化上一篇文章

按键分区状态(Keyed State)

????????按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。

????????需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

????????keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在?KeyedStream?上使用,在Java/Scala API上可以通过?stream.keyBy(...)?得到?KeyedStream

?1.1值状态(ValueState)

ValueState<T>: 保存一个可以更新和检索的值每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过?update(T)?进行更新,通过?T value()?进行检索。

1.2列表状态(ListState)

ListState<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过?add(T)?或者?addAll(List<T>)?进行添加元素,通过?Iterable<T> get()?获得整个列表。还可以通过?update(List<T>)?覆盖当前的列表。

1.3归约状态(ReducingState)

ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。接口与?ListState?类似,但使用?add(T)?增加元素,会使用提供的?ReduceFunction?进行聚合。

1.5聚合状态(AggregatingState)

AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和?ReducingState?相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与?ListState?类似,但使用?add(IN)?添加的元素会用指定的?AggregateFunction?进行聚合。

1.5Map状态(MapState)

MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用?put(UK,UV)?或者?putAll(Map<UK,UV>)?添加映射。 使用?get(UK)?检索特定 key。 使用?entries()keys()?和?values()?分别检索映射、键和值的可迭代视图。你还可以通过?isEmpty()?来判断是否包含任何键值对。

????????所有类型的状态还有一个clear()?方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

????????请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

????????必须创建一个?StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptor,?AggregatingStateDescriptor,?ReducingStateDescriptor?或?MapStateDescriptor

状态通过?RuntimeContext?进行访问,因此只能在?rich functions?中使用。

下面是一个?FlatMapFunction?的例子,展示了如何将这些部分组合起来:

java:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;
    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();
        // update the count
        currentSum.f0 += 1;
        // add the second field of the input value
        currentSum.f1 += input.f1;
        // update the state
        sum.update(currentSum);
        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }
    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();
// the printed output will be (1,4) and (1,5)

scala:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  private var sum: ValueState[(Long, Long)] = _
  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
    // access the state value
    val tmpCurrentSum = sum.value
    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }
    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
    // update the state
    sum.update(newSum)
    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }
  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)
  env.execute("ExampleKeyedState")
}

算子状态(Operator State)

???????

????????算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector?是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。

????????当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。

????????在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。

????????通过实现?CheckpointedFunction?接口来使用 operator state

CheckpointedFunction

????????CheckpointedFunction?接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

?

????????进行 checkpoint 时会调用?snapshotState()。 用户自定义函数初始化时会调用?initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此?initializeState()?不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

????????当前 operator state 以 list 的形式存在。这些状态是一个?可序列化?对象的集合?List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution:?每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素?element1?和?element2,当并发读增加为 2 时,element1?会被分到并发 0 上,element2?则会被分到并发 1 上。

  • Union redistribution:?每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。?

????????下面的例子中的?SinkFunction?在?CheckpointedFunction?中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。

java:scala:

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {
  @transient
  private var checkpointedState: ListState[(String, Int)] = _
  private val bufferedElements = ListBuffer[(String, Int)]()
  override def invoke(value: (String, Int), context: Context): Unit = {
    bufferedElements += value
    if (bufferedElements.size >= threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }
  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
    if(context.isRestored) {
      for(element <- checkpointedState.get().asScala) {
        bufferedElements += element
      }
    }
  }
}
 

initializeState?方法接收一个?FunctionInitializationContext?参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个?ListState?用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类似,StateDescriptor?会包括状态名字、以及状态类型相关信息。

java:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);

scala:


val descriptor = new ListStateDescriptor[(String, Long)](
    "buffered-elements",
    TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如?getUnionListState(descriptor)?会使用 union redistribution 算法, 而?getListState(descriptor)?则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过?isRestored()?方法判断是否从之前的故障中恢复回来,如果该方法返回?true?则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink?中初始化时,恢复回来的?ListState?的所有元素会添加到一个局部变量中,供下次?snapshotState()?时使用。 然后清空?ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在?initializeState()?方法中使用?FunctionInitializationContext?初始化 keyed state。

文章来源:https://blog.csdn.net/weixin_61070671/article/details/135718991
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。