该需求为实时接收对手Topic,并进行消费落盘至Hive。
在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。
本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。
华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html
着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。
保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。
运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。
参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html
在Linux环境中执行:
bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic
创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。
同时也可以开启两个新的终端窗口用于测试生产者和消费者:
bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties
参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html
通过以下网站下载华为MRS所需的样例代码:
https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0
下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。
镜像地址:
https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/
华为开源镜像站:
https://mirrors.huaweicloud.com/home
完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。
在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。
创建生产测试时,首先需要修改KafkaProperties
类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer
类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2
小节中开启的消费者窗口就能接受到生产的数据。
在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。
至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。
如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。
参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html
用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。
图为Flink在华为MRS安全模式的认证体系。
对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink
、/flink-checkpoint
目录获取权限,保证读
,写
,执行
。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。
Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。
参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html
最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。
在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:
if (LoginUtil.isSecurityModel()) {
try {
LOG.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
} catch (IOException e) {
LOG.error("Security prepare failure.");
LOG.error("The IOException occured : {}.", e);
return;
}
LOG.info("Security prepare success.");
}
对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。
在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer
方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。
FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink
方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner
为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd
,这样便于使用shell调度每日增量数据时日期变量的传递。
FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/
另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)
开启checkpoint,该参数是60秒开启一次。
完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run
提交到yarn,此时可以指定并行度及其他配置。
通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd
命名的文件夹中。
HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。
source华为的环境,认证状态成功;
创建日期变量:c_date=$(date '+%Y-%m-%d')
;
在beeline -u中执行HiveSQL代码:
使用beeline的变量函数--hivevar
将在外部注册的c_date
变量注册为hive beeline的变量;
创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;
将临时表中的数据解析,一般是json数据,可通过get_json_object
函数解析为字段,insert into table
存入贴源层正式表;
删除临时表;
有需要的话,也可以添加日志路径,将执行结果追加至日志。
该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd
格式,与flink写入在HDFS中的每日增量文件夹名相同;
然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;
解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。
需要注意的是,hive -e
命令似乎由于认证安全设置,无法在华为集群节点机使用。
通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。
需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。