一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本示例是演示ParameterTool的几种用法。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
本处是通过ParameterTool读取配置文件的内容,其他命令行、系统参数可以在运行时直接设置,没有进行截图。
文件目录及名称:tablesql/src/main/resources/testproperties.properties
jobmanager.rpc.address=server1
jobmanager.rpc.port=6123
jobmanager.memory.process.size=1600m
taskmanager.memory.process.size=4096m
taskmanager.numberOfTaskSlots=3
parallelism.default=1
high-availability=zookeeper
high-availability.storageDir=hdfs://HadoopHAcluster/flink13_5/ha/
high-availability.zookeeper.quorum=server1:2118,server2:2118,server3:2118
##单位毫秒,checkpoint时间间隔
execution.checkpointing.interval=5000
##单位个,保存checkpoint的个数
state.checkpoints.num-retained=20
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
state.savepoints.dir=hdfs:///flink/checkpoints
execution.checkpointing.timeout=600000
execution.checkpointing.min-pause=500
execution.checkpointing.max-concurrent-checkpoints=1
state.backend=filesystem
#state.checkpoints.dir=hdfs://server1:8020/flink13_5-checkpoints
state.checkpoints.dir=hdfs://HadoopHAcluster/flink13_5-checkpoints
jobmanager.execution.failover-strategy=region
web.submit.enable=true
jobmanager.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.web.address=server1
historyserver.web.port=9082
#historyserver.archive.fs.dir=hdfs://server1:8020/flink13_5/completed-jobs/
historyserver.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.archive.fs.refresh-interval=10000
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestParameterToolDemo {
static void test1() throws Exception {
String propertiesFilePath = "tablesql/src/main/resources/testproperties.properties";
// 方式一:直接通过配置文件的路径获取
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
// 方式二:通过配置文件路径构造File的方式获取
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter2 = ParameterTool.fromPropertiesFile(propertiesFile);
// 方式三:通过配置文件路径构造InputStream的方式获取
InputStream propertiesFileInputStream = new FileInputStream(propertiesFilePath);
ParameterTool parameter3 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
// 遍历配置文件内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// 获取必须的参数,否则会出现异常
System.out.println("jobmanager.rpc.address--->" + parameter.getRequired("jobmanager.rpc.address"));
// 获取参数,并设有默认值
System.out.println("state.checkpoints.num-retained--->" + parameter.get("state.checkpoints.num-retained", "30"));
// 获取Long类型的参数,并设置默认值
System.out.println("parallelism.default--->" + parameter.getLong("parallelism.default", 4L));
// 获取配置文件中有效参数的总行数
System.out.println("getNumberOfParameters--->" + parameter.getNumberOfParameters());
// 运行输出:
// parameter :historyserver.web.address = server1
// parameter :state.checkpoints.num-retained = 20
// parameter :historyserver.web.port = 9082
// parameter :jobmanager.execution.failover-strategy = region
// parameter :jobmanager.rpc.address = server1
// parameter :state.savepoints.dir = hdfs:///flink/checkpoints
// parameter :high-availability.storageDir =
// hdfs://HadoopHAcluster/flink13_5/ha/
// parameter :parallelism.default = 1
// parameter :taskmanager.numberOfTaskSlots = 3
// parameter :historyserver.archive.fs.dir =
// hdfs://HadoopHAcluster/flink13_5/completed-jobs/
// parameter :jobmanager.archive.fs.dir =
// hdfs://HadoopHAcluster/flink13_5/completed-jobs/
// parameter :execution.checkpointing.mode = EXACTLY_ONCE
// parameter :taskmanager.memory.process.size = 4096m
// parameter :jobmanager.memory.process.size = 1600m
// parameter :historyserver.archive.fs.refresh-interval = 10000
// parameter :jobmanager.rpc.port = 6123
// parameter :execution.checkpointing.timeout = 600000
// parameter :execution.checkpointing.interval = 5000
// parameter :high-availability.zookeeper.quorum =
// server1:2118,server2:2118,server3:2118
// parameter :high-availability = zookeeper
// parameter :execution.checkpointing.externalized-checkpoint-retention =
// RETAIN_ON_CANCELLATION
// parameter :web.submit.enable = true
// parameter :state.backend = filesystem
// parameter :execution.checkpointing.min-pause = 500
// parameter :execution.checkpointing.max-concurrent-checkpoints = 1
// parameter :state.checkpoints.dir =
// hdfs://HadoopHAcluster/flink13_5-checkpoints
// jobmanager.rpc.address--->server1
// state.checkpoints.num-retained--->20
// parallelism.default--->1
// getNumberOfParameters--->26
}
static void test2() throws Exception {
ParameterTool parameter = ParameterTool.fromSystemProperties();
// 遍历配置系统属性内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
}
static void test3(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameter.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
}
static void test4(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameters = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameters.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// 获取命令行参数mapParallelism的值(默认设置为2)并设置map的并行度
int parallelism = parameters.getInt("mapParallelism", 2);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return "name:" + lines[0] + " age: " + lines[1];
}).setParallelism(parallelism);
source.print();
env.execute();
}
// 注册全局参数
static void test5(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
// 遍历配配置值来自命令行内容
Map<String, String> parameterMap = parameters.toMap();
for (String key : parameterMap.keySet()) {
System.out.println("parameter :" + key + " = " + parameterMap.get(key));
}
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
// 获取命令行中的 prefix 参数值
String prefix = parameters.getRequired("prefix");
String[] lines = value.split(",");
// 将prefix加在name的前面进行拼接
return "name:" + prefix + "_" + lines[0] + " age: " + lines[1];
}
});
source.print();
env.execute();
}
public static void main(String[] args) throws Exception {
test2();
}
}
以上,本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。