【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

发布时间:2023年12月25日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了Flink State中的operator state基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要hadoop环境,因为模拟checkpoint的时候使用了hdfs。

本专题分为以下几篇文章:

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

关于Flink state的更多介绍参考文章:

8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
    <dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>3.1.4</version>
	</dependency>
</dependencies>

二、Operator State

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。

1、CheckpointedFunction

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。

  • Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

下面的例子中的 SinkFunction 在 CheckpointedFunction 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState 方法接收一个 FunctionInitializationContext 参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类系,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用。 然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

2、带状态的 Source Function

带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。

public static class CounterSource extends RichParallelSourceFunction<Long>  implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** 存储 state 的变量. */
    private ListState<Long> state;
     
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            "state",
            LongSerializer.INSTANCE));
            
        // 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.clear();
        state.add(offset);
    }
}

希望订阅 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener 接口。

3、operator state示例:实现程序异常时自动保存state,当超过重启次数时中断运行

实际生产中,一般不需要自己实现state,除非特殊情况。

本示例仅仅用于展示state的工作过程。

本示例实现功能是当程序出现异常时能自动重启并保存当前的state信息,当超过2次异常后程序中断运行。

该示例肯定是画蛇添足,Flink已经实现了该类,并且在介绍operator state的时候也给出了示例,本示例仅仅是以极其简单的介绍一下operator state的实现。

1)、实现

  • AlanOperatorState.java
import java.util.Iterator;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * @author alanchan 
 * 使用OperatorState中的ListState模拟KafkaSource进行offset维护
 */
public class AlanOperatorState extends RichParallelSourceFunction<String> implements CheckpointedFunction {
	private boolean flag = true;
	private ListState<Long> offsetState = null;
	private Long offset = 0L;

	// 创建ListState
	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
		ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);
		offsetState = context.getOperatorStateStore().getListState(stateDescriptor);
	}

	// 使用state
	@Override
	public void run(SourceContext<String> ctx) throws Exception {
		while (flag) {
			Iterator<Long> iterator = offsetState.get().iterator();
			// 由于是模拟,该迭代器仅有一条数据
			if (iterator.hasNext()) {
				offset = iterator.next();
			}
			offset += 1;
			int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
			ctx.collect("subTaskId:" + subTaskId + ",当前的offset值为::" + offset);
			Thread.sleep(1000);

			// 模拟异常
			if (offset % 3 == 0) {
				throw new Exception("出现了异常.....");
			}

		}
	}

	// 持久化state, 该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中
	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		offsetState.clear();// 清理内容数据并存入Checkpoint磁盘目录中
		offsetState.add(offset);
	}

	@Override
	public void cancel() {
		flag = false;
	}

}

  • TestOperatorStateDemo.java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestOperatorStateDemo {
	public static void main(String[] args) throws Exception {

		System.setProperty("HADOOP_USER_NAME", "alanchan");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		env.setParallelism(1);
		env.enableCheckpointing(1000);
		// 设置checkpoint点在hdfs上
		env.setStateBackend(new FsStateBackend("hdfs://server1:8020//flinktest/flinkckp"));
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

		// 重启策略:程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

		// source
		DataStreamSource<String> ds = env.addSource(new AlanOperatorState()).setParallelism(1);

		// transformation

		// sink
		ds.print();

		// execute
		env.execute();
	}
}

2)、运行结果

subTaskId:0,当前的offset值为::1
subTaskId:0,当前的offset值为::2
subTaskId:0,当前的offset值为::3
subTaskId:0,当前的offset值为::4
subTaskId:0,当前的offset值为::5
subTaskId:0,当前的offset值为::6
subTaskId:0,当前的offset值为::7
subTaskId:0,当前的offset值为::8
subTaskId:0,当前的offset值为::9
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	
。。。。。。

	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=3000)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	
。。。。。。

	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.Exception: 出现了异常.....
	at org.datastreamapi.state.AlanOperatorState.run(AlanOperatorState.java:46)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)


3)、hdfs上的checkpoint

在这里插入图片描述

以上,本文介绍了Flink State中的operator state基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/134934400
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。