目录
这篇文章是详细化上一篇文章
????????按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。
????????需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。
????????keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在?KeyedStream
?上使用,在Java/Scala API上可以通过?stream.keyBy(...)
?得到?KeyedStream
ValueState<T>
: 保存一个可以更新和检索的值(每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过?update(T)
?进行更新,通过?T value()
?进行检索。
ListState<T>
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过?add(T)
?或者?addAll(List<T>)
?进行添加元素,通过?Iterable<T> get()
?获得整个列表。还可以通过?update(List<T>)
?覆盖当前的列表。
ReducingState<T>
: 保存一个单值,表示添加到状态的所有值的聚合。接口与?ListState
?类似,但使用?add(T)
?增加元素,会使用提供的?ReduceFunction
?进行聚合。
AggregatingState<IN, OUT>
: 保留一个单值,表示添加到状态的所有值的聚合。和?ReducingState
?相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与?ListState
?类似,但使用?add(IN)
?添加的元素会用指定的?AggregateFunction
?进行聚合。
MapState<UK, UV>
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用?put(UK,UV)
?或者?putAll(Map<UK,UV>)
?添加映射。 使用?get(UK)
?检索特定 key。 使用?entries()
,keys()
?和?values()
?分别检索映射、键和值的可迭代视图。你还可以通过?isEmpty()
?来判断是否包含任何键值对。
????????所有类型的状态还有一个clear()
?方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
????????请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
????????必须创建一个?StateDescriptor
,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction
。 根据不同的状态类型,可以创建ValueStateDescriptor
,ListStateDescriptor
,?AggregatingStateDescriptor
,?ReducingStateDescriptor
?或?MapStateDescriptor
。
状态通过?RuntimeContext
?进行访问,因此只能在?rich functions?中使用。
下面是一个?FlatMapFunction
?的例子,展示了如何将这些部分组合起来:
java:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
scala:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleKeyedState")
}
???????
????????算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector?是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。
????????当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。
????????在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。
????????通过实现?CheckpointedFunction
?接口来使用 operator state
????????CheckpointedFunction
?接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
?
????????进行 checkpoint 时会调用?snapshotState()
。 用户自定义函数初始化时会调用?initializeState()
,初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此?initializeState()
?不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
????????当前 operator state 以 list 的形式存在。这些状态是一个?可序列化?对象的集合?List
,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
Even-split redistribution:?每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素?element1
?和?element2
,当并发读增加为 2 时,element1
?会被分到并发 0 上,element2
?则会被分到并发 1 上。
Union redistribution:?每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。?
????????下面的例子中的?SinkFunction
?在?CheckpointedFunction
?中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。
java:scala:
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int), context: Context): Unit = {
bufferedElements += value
if (bufferedElements.size >= threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) {
for(element <- checkpointedState.get().asScala) {
bufferedElements += element
}
}
}
}
initializeState
?方法接收一个?FunctionInitializationContext
?参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个?ListState
?用于在 checkpoint 时保存 non-keyed state 对象。
注意这些状态是如何初始化的,和 keyed state 类似,StateDescriptor
?会包括状态名字、以及状态类型相关信息。
java:
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
scala:
val descriptor = new ListStateDescriptor[(String, Long)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如?getUnionListState(descriptor)
?会使用 union redistribution 算法, 而?getListState(descriptor)
?则简单的使用 even-split redistribution 算法。
当初始化好状态对象后,我们通过?isRestored()
?方法判断是否从之前的故障中恢复回来,如果该方法返回?true
?则表示从故障中进行恢复,会执行接下来的恢复逻辑。
正如代码所示,BufferingSink
?中初始化时,恢复回来的?ListState
?的所有元素会添加到一个局部变量中,供下次?snapshotState()
?时使用。 然后清空?ListState
,再把当前局部变量中的所有元素写入到 checkpoint 中。
另外,我们同样可以在?initializeState()
?方法中使用?FunctionInitializationContext
?初始化 keyed state。