在主节点使用Flume采集/data_log目录下实时日志文件中的数据,将数据存入到Kafka的Topic中(Topic名称为ChangeRecord分区数为4)
运行/data_log路径下的make_data_file_v1
./make_data_file_v1
kafka-topics.sh --create --bootstrap-server bigdata1:9092 --replication-factor 1 --partitions 4 --topic ChangeRecord
在flume 文件下写入一个实时采集配置文件名gongye_1_logger.conf(可改)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f +0 /data_log/2024-01-15@16:41-changerecord.csv
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ChangeRecord
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -n a1 -c conf/ -f /opt/module/flume-1.9.0/gongye_1_logger.conf -Dflume.root.logger=INFO,console
kafka-console-consumer.sh --bootstrap-server bigdata1:9092,bigdata2:9092,bigdata3:9092 --topic ChangeRecord --from-beginning