目标:了解数据源的格式及实现模拟数据的生成
路径
实施
数据格式
消息时间 | 发件人昵称 | 发件人账号 | 发件人性别 | 发件人IP | 发件人系统 | 发件人手机型号 | 发件人网络制式 | 发件人GPS | 收件人昵称 | 收件人IP | 收件人账号 | 收件人系统 | 收件人手机型号 | 收件人网络制式 | 收件人GPS | 收件人性别 | 消息类型 | 双方距离 | 消息 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
msg_time | sender_nickyname | sender_account | sender_sex | sender_ip | sender_os | sender_phone_type | sender_network | sender_gps | receiver_nickyname | receiver_ip | receiver_account | receiver_os | receiver_phone_type | receiver_network | receiver_gps | receiver_sex | msg_type | distance | message |
2020/05/08 15:11:33 | 古博易 | 14747877194 | 男 | 48.147.134.255 | Android 8.0 | 小米 Redmi K30 | 4G | 94.704577,36.247553 | 莱优 | 97.61.25.52 | 17832829395 | IOS 10.0 | Apple iPhone 10 | 4G | 84.034145,41.423804 | 女 | TEXT | 77.82KM | 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。 |
数据生成
创建原始文件目录
mkdir /export/data/momo_init
上传模拟数据程序
cd /export/data/momo_init
rz
创建模拟数据目录
mkdir /export/data/momo_data
运行程序生成数据
语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500
结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001
小结
目标:掌握实时案例的技术架构及技术选型
路径
实施
需求分析
技术选型
技术架构
小结
目标:回顾Flume基本使用及实现Flume的安装测试
路径
实施
Flume的回顾
Flume的安装
上传安装包
cd /export/software/
rz
解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
修改配置
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0
删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
创建目录
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position
Flume的测试
需求:采集聊天数据,写入HDFS
分析
开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动HDFS
start-dfs.sh
运行Flume
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
查看结果
小结
目标:实现案例Flume采集程序的开发
路径
实施
需求分析
需求:采集聊天数据,实时写入Kafka
Source:taildir
Channel:mem
Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
测试实现
启动Kafka
start-zk-all.sh
start-kafka.sh
创建Topic
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
注意:Kafka2.11版本用–zookeeper 替代
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092
列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50
观察结果
小结