一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink的并行度设置的几种方式以及并行度最大的值。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。
使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。
一个 task 的并行度可以从多个层次指定:
单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:
// 设置 算子 并行度
static void test1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
}).setParallelism(8);// 设置map的并行度
source.print();
env.execute();
}
Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
// 设置 执行环境层次 并行度
static void test2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
});
source.print();
env.execute();
}
将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。
在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
或者在 Java 程序中,可以通过如下方式指定并行度:
说明:
1、该种方法比较复杂,是不是相当于把Flink自身的客户端实现重新实现了一遍呢?大致逻辑如下,代码示例
2、具体实现可以参考其客户端的实现以及测试用例中的实现。
3、客户端的入口类为org.apache.flink.client.cli.CliFrontend;其测试用例类为org.apache.flink.client.program.ClientTest
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestParallelismByClientDemo {
private static final String TEST_EXECUTOR_NAME = "test_executor";
private static Plan plan;
private static Configuration config;
private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
plan = env.createProgramPlan();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
// 1、构造PackagedProgram
Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
// <dependency>
// <groupId>org.apache.flink</groupId>
// <artifactId>flink-runtime</artifactId>
// <version>${flink.version}</version>
// </dependency>
String entryPointClass = TestExecute.class.getName();
String jarFilePath = "../examples/flinktest.jar";//打包jar文件的路径
File jarFile = new File(jarFilePath);
List<URL> classpaths = PackagedProgram.getJobJarAndDependencies(jarFile,entryPointClass);
// Creates an instance that wraps the plan defined in the jar file using the given arguments
// For generating the plan the class defined in the className parameter is used.
// private PackagedProgram(
// @Nullable File jarFile, //jarFile The jar file which contains the plan.
// List<URL> classpaths, //classpaths Additional classpath URLs needed by the Program.
// @Nullable String entryPointClassName, //entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest.
// Configuration configuration, //configuration Flink configuration which affects the classloading policy of the Program execution.
// SavepointRestoreSettings savepointRestoreSettings,
// String... args) //args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setUserClassPaths(classpaths)
.setEntryPointClassName(entryPointClass)
.setConfiguration(configuration)
.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(configuration))
.setArguments(args)
.build();
// 2、构造客户端执行环境
// public static void executeProgram(
// PipelineExecutorServiceLoader executorServiceLoader,
// Configuration configuration,
// PackagedProgram program,
// boolean enforceSingleJobExecution,
// boolean suppressSysout)
// ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, program, false, false);
}
public static final class TestExecute {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
for (int i = 0; i < 2; i++) {
env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
JobClient jc = env.executeAsync();
jc.getJobExecutionResult();
}
}
}
private static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {
private final ClusterClient<?> clusterClient;
private final Plan plan;
TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) {
this.clusterClient = checkNotNull(clusterClient);
this.plan = checkNotNull(plan);
}
@Override
public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new PipelineExecutorFactory() {
@Override
public String getName() {
return "my-name";
}
@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
}
@Override
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, config, classLoader) -> {
final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(classLoader, plan, config, parallelism);
final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
jobGraph.addJars(accessor.getJars());
jobGraph.setClasspaths(accessor.getClasspaths());
final JobID jobID = clusterClient.submitJob(jobGraph).get();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID, classLoader));
};
}
};
}
@Override
public Stream<String> getExecutorNames() {
throw new UnsupportedOperationException("not implemented");
}
}
}
可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。
更多的信息参考下文链接:
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。
默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768。
为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。
从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。
以上,本文介绍了Flink的并行度设置的几种方式以及并行度最大的值。