一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几种方式)。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。要更改影响所有作业的默认值,请参阅配置。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
以下是可用的配置选项(下面粗体部分为系统默认值):
setClosureCleanerLevel()
closure cleaner 的级别默认设置为 ClosureCleanerLevel.RECURSIVE。closure cleaner 删除 Flink 程序中对匿名 function 的调用类的不必要引用。禁用 closure cleaner 后,用户的匿名 function 可能正引用一些不可序列化的调用类。这将导致序列化器出现异常。
可设置的值是:
NONE:完全禁用 closure cleaner ,
TOP_LEVEL:只清理顶级类而不递归到字段中,
RECURSIVE:递归清理所有字段。
getParallelism() / setParallelism(int parallelism)
为作业设置默认的并行度。
getMaxParallelism() / setMaxParallelism(int parallelism)
为作业设置默认的最大并行度。此设置决定最大并行度并指定动态缩放的上限。
getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)
设置失败任务重新执行的次数。
值为零会有效地禁用容错。
-1 表示使用系统默认值(在配置中定义)。
该配置已弃用,请改用重启策略 。
getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)
设置系统在作业失败后重新执行之前等待的延迟(以毫秒为单位)。
在 TaskManagers 上成功停止所有任务后,开始计算延迟,一旦延迟过去,任务会被重新启动。
此参数对于延迟重新执行的场景很有用,当尝试重新执行作业时,由于相同的问题,作业会立刻再次失败,该参数便于作业再次失败之前让某些超时相关的故障完全浮出水面(例如尚未完全超时的断开连接)。
此参数仅在执行重试次数为一次或多次时有效。
该配置已被弃用,请改用重启策略 。
getExecutionMode() / setExecutionMode()。默认的执行模式是 PIPELINED
设置执行模式以执行程序。
执行模式定义了数据交换是以批处理方式还是以流方式执行。
enableForceKryo() / disableForceKryo()
默认情况下不强制使用 Kryo。
强制 GenericTypeInformation 对 POJO 使用 Kryo 序列化器,即使我们可以将它们作为 POJO 进行分析。
在某些情况下,应该优先启用该配置。
例如,当 Flink 的内部序列化器无法正确处理 POJO 时。
enableForceAvro() / disableForceAvro()
默认情况下不强制使用 Avro。
强制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化 Avro 的 POJO。
enableObjectReuse() / disableObjectReuse()
默认情况下,Flink 中不重用对象。
启用对象重用模式会指示运行时重用用户对象以获得更好的性能。
当一个算子的用户代码 function 没有意识到这种行为时可能会导致bug。
getGlobalJobParameters() / setGlobalJobParameters()
此方法允许用户将自定义对象设置为作业的全局配置。
由于 ExecutionConfig 可在所有用户定义的 function 中访问,因此这是一种使配置在作业中全局可用的简单方法。
addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
为指定的类型注册 Kryo 序列化器实例。
addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
为指定的类型注册 Kryo 序列化器的类。
registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
使用 Kryo 注册指定类型并为其指定序列化器。
通过使用 Kryo 注册类型,该类型的序列化将更加高效。
registerKryoType(Class<?> type)
如果类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记(整数 ID)被写入。
如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的 I/O 成本。
registerPojoType(Class<?> type)
将指定的类型注册到序列化栈中。
如果该类型最终被序列化为 POJO,那么该类型将注册到 POJO 序列化器中。
如果该类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记被写入。
如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的I/O成本。
用 registerKryoType() 注册的类型对 Flink 的 Kryo 序列化器实例来说是不可用的。
disableAutoTypeRegistration()
自动类型注册在默认情况下是启用的。
自动类型注册是将用户代码使用的所有类型(包括子类型)注册到 Kryo 和 POJO 序列化器。
setTaskCancellationInterval(long interval)
设置尝试连续取消正在运行任务的等待时间间隔(以毫秒为单位)。
当一个任务被取消时,会创建一个新的线程,如果任务线程在一定时间内没有终止,新线程就会定期调用任务线程上的 interrupt() 方法。
这个参数是指连续调用 interrupt() 的时间间隔,默认设置为 30000 毫秒,或 30秒 。
通过 getRuntimeContext() 方法在 Rich* function 中访问到的 RuntimeContext 也允许在所有用户定义的 function 中访问 ExecutionConfig。
Flink 程序可以使用 remote environment 在集群上执行。或者,程序可以被打包成 JAR 文件(Java Archives)执行。如果使用命令行的方式执行程序,将程序打包是必需的。
为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取的 environment。
当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述接口不同,该 environment 会扮演本地环境的角色。
打包程序只要简单地将所有相关的类导出为 JAR 文件,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main 方法)的类。
实现的最简单方法是将 main-class 写入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。
main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile 执行 JAR 文件时寻找 main 方法的类是相同的。
大多数 IDE 提供了在导出 JAR 文件时自动包含该属性的功能。
调用打包后程序的完整流程包括两步:
搜索 JAR 文件 manifest 中的 main-class 或 program-class 属性。如果两个属性同时存在,program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入入口点类名参数。
系统接着调用该类的 main 方法。
一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。
使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。
一个 task 的并行度可以从多个层次指定:
单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:
// 设置 算子 并行度
static void test1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
}).setParallelism(8);// 设置map的并行度
source.print();
env.execute();
}
Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
// 设置 执行环境层次 并行度
static void test2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
});
source.print();
env.execute();
}
将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。
在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
或者在 Java 程序中,可以通过如下方式指定并行度:
说明:
1、该种方法比较复杂,是不是相当于把Flink自身的客户端实现重新实现了一遍呢?大致逻辑如下,代码示例
2、具体实现可以参考其客户端的实现以及测试用例中的实现。
3、客户端的入口类为org.apache.flink.client.cli.CliFrontend;其测试用例类为org.apache.flink.client.program.ClientTest
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestParallelismByClientDemo {
private static final String TEST_EXECUTOR_NAME = "test_executor";
private static Plan plan;
private static Configuration config;
private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
plan = env.createProgramPlan();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
// 1、构造PackagedProgram
Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
// <dependency>
// <groupId>org.apache.flink</groupId>
// <artifactId>flink-runtime</artifactId>
// <version>${flink.version}</version>
// </dependency>
String entryPointClass = TestExecute.class.getName();
String jarFilePath = "../examples/flinktest.jar";//打包jar文件的路径
File jarFile = new File(jarFilePath);
List<URL> classpaths = PackagedProgram.getJobJarAndDependencies(jarFile,entryPointClass);
// Creates an instance that wraps the plan defined in the jar file using the given arguments
// For generating the plan the class defined in the className parameter is used.
// private PackagedProgram(
// @Nullable File jarFile, //jarFile The jar file which contains the plan.
// List<URL> classpaths, //classpaths Additional classpath URLs needed by the Program.
// @Nullable String entryPointClassName, //entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest.
// Configuration configuration, //configuration Flink configuration which affects the classloading policy of the Program execution.
// SavepointRestoreSettings savepointRestoreSettings,
// String... args) //args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setUserClassPaths(classpaths)
.setEntryPointClassName(entryPointClass)
.setConfiguration(configuration)
.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(configuration))
.setArguments(args)
.build();
// 2、构造客户端执行环境
// public static void executeProgram(
// PipelineExecutorServiceLoader executorServiceLoader,
// Configuration configuration,
// PackagedProgram program,
// boolean enforceSingleJobExecution,
// boolean suppressSysout)
// ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, program, false, false);
}
public static final class TestExecute {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
for (int i = 0; i < 2; i++) {
env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
JobClient jc = env.executeAsync();
jc.getJobExecutionResult();
}
}
}
private static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {
private final ClusterClient<?> clusterClient;
private final Plan plan;
TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) {
this.clusterClient = checkNotNull(clusterClient);
this.plan = checkNotNull(plan);
}
@Override
public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new PipelineExecutorFactory() {
@Override
public String getName() {
return "my-name";
}
@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
}
@Override
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, config, classLoader) -> {
final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(classLoader, plan, config, parallelism);
final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
jobGraph.addJars(accessor.getJars());
jobGraph.setClasspaths(accessor.getClasspaths());
final JobID jobID = clusterClient.submitJob(jobGraph).get();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID, classLoader));
};
}
};
}
@Override
public Stream<String> getExecutorNames() {
throw new UnsupportedOperationException("not implemented");
}
}
}
可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。
更多的信息参考下文链接:
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。
默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768。
为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。
从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。
以上,本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几种方式)。