使用flume做实时数据采集

发布时间:2024年01月17日

在这里插入图片描述

实时数据采集

实时采集要求:

在主节点使用Flume采集/data_log目录下实时日志文件中的数据,将数据存入到Kafka的Topic中(Topic名称为ChangeRecord分区数为4)

实时采集前置准备

模拟数据源环境

运行/data_log路径下的make_data_file_v1

./make_data_file_v1
创建Topic并设置名称ChangeRecord
kafka-topics.sh --create --bootstrap-server bigdata1:9092 --replication-factor 1 --partitions 4 --topic ChangeRecord

实时采集

编写flume采集的配置文件

在flume 文件下写入一个实时采集配置文件名gongye_1_logger.conf(可改)

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f +0 /data_log/2024-01-15@16:41-changerecord.csv
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ChangeRecord
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume 实时采集
flume-ng agent -n a1 -c conf/ -f /opt/module/flume-1.9.0/gongye_1_logger.conf  -Dflume.root.logger=INFO,console
kafka消费数据
kafka-console-consumer.sh --bootstrap-server bigdata1:9092,bigdata2:9092,bigdata3:9092 --topic ChangeRecord --from-beginning
文章来源:https://blog.csdn.net/2301_80792800/article/details/135616367
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。