一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。
为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。
这里说的 Parametertool 并不是必须使用的。
Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。
ParameterTool 定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map<string,string> 类型,这样使得它可以很容易地与你的配置集成在一起。
以下方法可以读取 Properties 文件并解析出键/值对:
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
以下方法可以从命令行中获取参数,如 --input hdfs:///mydata --elements 42。
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
}
启动 JVM 时,可以将系统属性传递给 JVM:-Dinput=hdfs:///mydata。你也可以从这些系统属性初始化 ParameterTool:
ParameterTool parameter = ParameterTool.fromSystemProperties();
ParameterTool 本身具有访问配置值的方法。
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters();
// .. there are more methods available.
你可以在提交应用程序时直接在客户端的 main() 方法中使用这些方法的返回值。例如,你可以这样设置算子的并行度:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
由于 ParameterTool 是序列化的,你可以将其传递给函数本身:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在函数内使用它以获取命令行的传递的参数。
从 JobManager web 界面和用户定义的所有函数中可以以配置值的方式访问在 ExecutionConfig 中注册的全局作业参数。
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
本示例是将上述的内容以可运行的代码呈现。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
本处是通过ParameterTool读取配置文件的内容,其他命令行、系统参数可以在运行时直接设置,没有进行截图。
文件目录及名称:tablesql/src/main/resources/testproperties.properties
jobmanager.rpc.address=server1
jobmanager.rpc.port=6123
jobmanager.memory.process.size=1600m
taskmanager.memory.process.size=4096m
taskmanager.numberOfTaskSlots=3
parallelism.default=1
high-availability=zookeeper
high-availability.storageDir=hdfs://HadoopHAcluster/flink13_5/ha/
high-availability.zookeeper.quorum=server1:2118,server2:2118,server3:2118
##单位毫秒,checkpoint时间间隔
execution.checkpointing.interval=5000
##单位个,保存checkpoint的个数
state.checkpoints.num-retained=20
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
state.savepoints.dir=hdfs:///flink/checkpoints
execution.checkpointing.timeout=600000
execution.checkpointing.min-pause=500
execution.checkpointing.max-concurrent-checkpoints=1
state.backend=filesystem
#state.checkpoints.dir=hdfs://server1:8020/flink13_5-checkpoints
state.checkpoints.dir=hdfs://HadoopHAcluster/flink13_5-checkpoints
jobmanager.execution.failover-strategy=region
web.submit.enable=true
jobmanager.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.web.address=server1
historyserver.web.port=9082
#historyserver.archive.fs.dir=hdfs://server1:8020/flink13_5/completed-jobs/
historyserver.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.archive.fs.refresh-interval=10000
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestParameterToolDemo {
static void test1() throws Exception {
String propertiesFilePath = "tablesql/src/main/resources/testproperties.properties";
// 方式一:直接通过配置文件的路径获取
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
// 方式二:通过配置文件路径构造File的方式获取
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter2 = ParameterTool.fromPropertiesFile(propertiesFile);
// 方式三:通过配置文件路径构造InputStream的方式获取
InputStream propertiesFileInputStream = new FileInputStream(propertiesFilePath);
ParameterTool parameter3 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
// 遍历配置文件内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// 获取必须的参数,否则会出现异常
System.out.println("jobmanager.rpc.address--->" + parameter.getRequired("jobmanager.rpc.address"));
// 获取参数,并设有默认值
System.out.println("state.checkpoints.num-retained--->" + parameter.get("state.checkpoints.num-retained", "30"));
// 获取Long类型的参数,并设置默认值
System.out.println("parallelism.default--->" + parameter.getLong("parallelism.default", 4L));
// 获取配置文件中有效参数的总行数
System.out.println("getNumberOfParameters--->" + parameter.getNumberOfParameters());
// 运行输出:
// parameter :historyserver.web.address = server1
// parameter :state.checkpoints.num-retained = 20
// parameter :historyserver.web.port = 9082
// parameter :jobmanager.execution.failover-strategy = region
// parameter :jobmanager.rpc.address = server1
// parameter :state.savepoints.dir = hdfs:///flink/checkpoints
// parameter :high-availability.storageDir =
// hdfs://HadoopHAcluster/flink13_5/ha/
// parameter :parallelism.default = 1
// parameter :taskmanager.numberOfTaskSlots = 3
// parameter :historyserver.archive.fs.dir =
// hdfs://HadoopHAcluster/flink13_5/completed-jobs/
// parameter :jobmanager.archive.fs.dir =
// hdfs://HadoopHAcluster/flink13_5/completed-jobs/
// parameter :execution.checkpointing.mode = EXACTLY_ONCE
// parameter :taskmanager.memory.process.size = 4096m
// parameter :jobmanager.memory.process.size = 1600m
// parameter :historyserver.archive.fs.refresh-interval = 10000
// parameter :jobmanager.rpc.port = 6123
// parameter :execution.checkpointing.timeout = 600000
// parameter :execution.checkpointing.interval = 5000
// parameter :high-availability.zookeeper.quorum =
// server1:2118,server2:2118,server3:2118
// parameter :high-availability = zookeeper
// parameter :execution.checkpointing.externalized-checkpoint-retention =
// RETAIN_ON_CANCELLATION
// parameter :web.submit.enable = true
// parameter :state.backend = filesystem
// parameter :execution.checkpointing.min-pause = 500
// parameter :execution.checkpointing.max-concurrent-checkpoints = 1
// parameter :state.checkpoints.dir =
// hdfs://HadoopHAcluster/flink13_5-checkpoints
// jobmanager.rpc.address--->server1
// state.checkpoints.num-retained--->20
// parallelism.default--->1
// getNumberOfParameters--->26
}
static void test2() throws Exception {
ParameterTool parameter = ParameterTool.fromSystemProperties();
// 遍历配置系统属性内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
}
static void test3(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
}
static void test4(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameters = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameters.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// 获取命令行参数mapParallelism的值(默认设置为2)并设置map的并行度
int parallelism = parameters.getInt("mapParallelism", 2);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
}).setParallelism(parallelism);
source.print();
env.execute();
}
// 注册全局参数
static void test5(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameters.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
// 获取命令行中的 prefix 参数值
String prefix = parameters.getRequired("prefix");
String[] lines = value.split(",");
// 将prefix加在name的前面进行拼接
return "name:" + prefix + "_" + lines[0] + " age: " + lines[1];
}
});
source.print();
env.execute();
}
public static void main(String[] args) throws Exception {
test2();
}
}
以上,本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。