52、Flink的应用程序参数处理-ParameterTool介绍及使用示例

发布时间:2024年01月06日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、应用程序参数处理

几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。

这里说的 Parametertool 并不是必须使用的。

Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。

1、用 ParameterTool 读取配置值

ParameterTool 定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map<string,string> 类型,这样使得它可以很容易地与你的配置集成在一起。

1)、配置值来自 .properties 文件

以下方法可以读取 Properties 文件并解析出键/值对:

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

2)、配置值来自命令行

以下方法可以从命令行中获取参数,如 --input hdfs:///mydata --elements 42。

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..
    
 }
 

3)、配置值来自系统属性

启动 JVM 时,可以将系统属性传递给 JVM:-Dinput=hdfs:///mydata。你也可以从这些系统属性初始化 ParameterTool:

ParameterTool parameter = ParameterTool.fromSystemProperties();

2、在 Flink 程序中使用参数

1)、直接从 ParameterTool 获取

ParameterTool 本身具有访问配置值的方法。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters();
// .. there are more methods available.

你可以在提交应用程序时直接在客户端的 main() 方法中使用这些方法的返回值。例如,你可以这样设置算子的并行度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 是序列化的,你可以将其传递给函数本身:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内使用它以获取命令行的传递的参数。

2)、注册全局参数

从 JobManager web 界面和用户定义的所有函数中可以以配置值的方式访问在 ExecutionConfig 中注册的全局作业参数。

  • 注册全局参数
ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
  • 在任意富函数中访问参数
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        ParameterTool parameters = (ParameterTool)
                getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        parameters.getRequired("input");
        // .. do more ..

二、示例:ParameterTool几种的应用示例

本示例是将上述的内容以可运行的代码呈现。

1、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-common</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
		<!-- <scope>provided</scope> -->
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
</dependencies>

2、实现及验证

本处是通过ParameterTool读取配置文件的内容,其他命令行、系统参数可以在运行时直接设置,没有进行截图。

1)、测试文件准备

文件目录及名称:tablesql/src/main/resources/testproperties.properties

jobmanager.rpc.address=server1
jobmanager.rpc.port=6123
jobmanager.memory.process.size=1600m
taskmanager.memory.process.size=4096m
taskmanager.numberOfTaskSlots=3
parallelism.default=1

high-availability=zookeeper
high-availability.storageDir=hdfs://HadoopHAcluster/flink13_5/ha/
high-availability.zookeeper.quorum=server1:2118,server2:2118,server3:2118

##单位毫秒,checkpoint时间间隔
execution.checkpointing.interval=5000
##单位个,保存checkpoint的个数
state.checkpoints.num-retained=20

execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION

state.savepoints.dir=hdfs:///flink/checkpoints
execution.checkpointing.timeout=600000
execution.checkpointing.min-pause=500
execution.checkpointing.max-concurrent-checkpoints=1

state.backend=filesystem
#state.checkpoints.dir=hdfs://server1:8020/flink13_5-checkpoints
state.checkpoints.dir=hdfs://HadoopHAcluster/flink13_5-checkpoints
jobmanager.execution.failover-strategy=region

web.submit.enable=true


jobmanager.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.web.address=server1
historyserver.web.port=9082
#historyserver.archive.fs.dir=hdfs://server1:8020/flink13_5/completed-jobs/
historyserver.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/

historyserver.archive.fs.refresh-interval=10000

2)、实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Map;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestParameterToolDemo {

    static void test1() throws Exception {
        String propertiesFilePath = "tablesql/src/main/resources/testproperties.properties";

        // 方式一:直接通过配置文件的路径获取
        ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

        // 方式二:通过配置文件路径构造File的方式获取
        File propertiesFile = new File(propertiesFilePath);
        ParameterTool parameter2 = ParameterTool.fromPropertiesFile(propertiesFile);

        // 方式三:通过配置文件路径构造InputStream的方式获取
        InputStream propertiesFileInputStream = new FileInputStream(propertiesFilePath);
        ParameterTool parameter3 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

        // 遍历配置文件内容
        Map<String, String> parameterMap = parameter.toMap();
        for (String key : parameterMap.keySet()) {
            System.out.println("parameter :" + key + " = " + parameterMap.get(key));
        }

        // 获取必须的参数,否则会出现异常
        System.out.println("jobmanager.rpc.address--->" + parameter.getRequired("jobmanager.rpc.address"));
        // 获取参数,并设有默认值
        System.out.println("state.checkpoints.num-retained--->" + parameter.get("state.checkpoints.num-retained", "30"));
        // 获取Long类型的参数,并设置默认值
        System.out.println("parallelism.default--->" + parameter.getLong("parallelism.default", 4L));
        // 获取配置文件中有效参数的总行数
        System.out.println("getNumberOfParameters--->" + parameter.getNumberOfParameters());

        // 运行输出:
        // parameter :historyserver.web.address = server1
        // parameter :state.checkpoints.num-retained = 20
        // parameter :historyserver.web.port = 9082
        // parameter :jobmanager.execution.failover-strategy = region
        // parameter :jobmanager.rpc.address = server1
        // parameter :state.savepoints.dir = hdfs:///flink/checkpoints
        // parameter :high-availability.storageDir =
        // hdfs://HadoopHAcluster/flink13_5/ha/
        // parameter :parallelism.default = 1
        // parameter :taskmanager.numberOfTaskSlots = 3
        // parameter :historyserver.archive.fs.dir =
        // hdfs://HadoopHAcluster/flink13_5/completed-jobs/
        // parameter :jobmanager.archive.fs.dir =
        // hdfs://HadoopHAcluster/flink13_5/completed-jobs/
        // parameter :execution.checkpointing.mode = EXACTLY_ONCE
        // parameter :taskmanager.memory.process.size = 4096m
        // parameter :jobmanager.memory.process.size = 1600m
        // parameter :historyserver.archive.fs.refresh-interval = 10000
        // parameter :jobmanager.rpc.port = 6123
        // parameter :execution.checkpointing.timeout = 600000
        // parameter :execution.checkpointing.interval = 5000
        // parameter :high-availability.zookeeper.quorum =
        // server1:2118,server2:2118,server3:2118
        // parameter :high-availability = zookeeper
        // parameter :execution.checkpointing.externalized-checkpoint-retention =
        // RETAIN_ON_CANCELLATION
        // parameter :web.submit.enable = true
        // parameter :state.backend = filesystem
        // parameter :execution.checkpointing.min-pause = 500
        // parameter :execution.checkpointing.max-concurrent-checkpoints = 1
        // parameter :state.checkpoints.dir =
        // hdfs://HadoopHAcluster/flink13_5-checkpoints
        // jobmanager.rpc.address--->server1
        // state.checkpoints.num-retained--->20
        // parallelism.default--->1
        // getNumberOfParameters--->26
    }

    static void test2() throws Exception {
        ParameterTool parameter = ParameterTool.fromSystemProperties();
        // 遍历配置系统属性内容
        Map<String, String> parameterMap = parameter.toMap();
        for (String key : parameterMap.keySet()) {
            System.out.println("parameter :" + key + " = " + parameterMap.get(key));
        }
    }

    static void test3(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        // 遍历配配置值来自命令行内容
        Map<String, String> parameterMap = parameter.toMap();
        for (String key : parameterMap.keySet()) {
            System.out.println("parameter :" + key + " = " + parameterMap.get(key));
        }
    }

    static void test4(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameters = ParameterTool.fromArgs(args);

        // 遍历配配置值来自命令行内容
        Map<String, String> parameterMap = parameters.toMap();
        for (String key : parameterMap.keySet()) {
            System.out.println("parameter :" + key + " = " + parameterMap.get(key));
        }

        // 获取命令行参数mapParallelism的值(默认设置为2)并设置map的并行度
        int parallelism = parameters.getInt("mapParallelism", 2);

        DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
                .map(o -> {
                    String[] lines = o.split(",");
                    return "name:" + lines[0] + " age: " + lines[1];
                }).setParallelism(parallelism);

        source.print();

        env.execute();
    }

    // 注册全局参数
    static void test5(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromArgs(args);

        // 遍历配配置值来自命令行内容
        Map<String, String> parameterMap = parameters.toMap();
        for (String key : parameterMap.keySet()) {
            System.out.println("parameter :" + key + " = " + parameterMap.get(key));
        }
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        DataStream<String> source = env.socketTextStream("192.168.10.42", 8888)
                .map(new RichMapFunction<String, String>() {

                    @Override
                    public String map(String value) throws Exception {
                        ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig()
                                .getGlobalJobParameters();
                        // 获取命令行中的 prefix 参数值
                        String prefix = parameters.getRequired("prefix");

                        String[] lines = value.split(",");
                        // 将prefix加在name的前面进行拼接
                        return "name:" + prefix + "_" + lines[0] + " age: " + lines[1];
                    }

                });

        source.print();

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        test2();
    }
}

以上,本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/134728744
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。