flume案例

发布时间:2024年01月22日

在构建数仓时,经常会用到flume接收日志数据,通常涉及到的组件为kafka,hdfs等。下面以一个flume接收指定topic数据,并存入hdfs的案例,大致了解下flume相关使用规则。

版本:1.9

Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。

属性名

默认值

解释

channels

与Source绑定的channel,多个用空格分开

type

组件类型,这个是: org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Source使用的Kafka集群实例列表

kafka.consumer.group.id

flume

消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组

kafka.topics

将要读取消息的目标 Kafka topic 列表,多个用逗号分隔

kafka.topics.regex

会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。

batchSize

1000

一批写入 channel 的最大消息数

batchDurationMillis

1000

一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。

backoffSleepIncrement

1000

当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

maxBackoffSleep

5000

Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

useFlumeEventFormat

false

默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。

setTopicHeader

true

当设置为 true 时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。

topicHeader

topic

如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。

kafka.consumer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXTSASL_SSLSSL ,有关安全设置的其他信息,请参见下文。

more consumer security props

如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置

Other Kafka Consumer Properties

其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

必需的参数已用 粗体 标明。

已经弃用的一些属性:

属性名

默认值

解释

topic

改用 kafka.topics

groupId

flume

改用 kafka.consumer.group.id

zookeeperConnect

自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

migrateZookeeperOffsets

true

如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。

?Channel

此处选择memory channel,内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。

属性

默认值

解释

type

组件类型,这个是: memory

capacity

100

内存中存储 Event 的最大数

transactionCapacity

100

source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

keep-alive

3

添加或删除一个 Event 的超时时间(秒)

byteCapacityBufferPercentage

20

指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比

byteCapacity

Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

Sink

HDFS Sink ,这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

属性名

默认值

解释

channel

与 Sink 连接的 channel

type

组件类型,这个是: hdfs

hdfs.path

HDFS目录路径(例如:hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Flume在HDFS文件夹下创建新文件的固定前缀

hdfs.fileSuffix

Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)

hdfs.inUsePrefix

Flume正在写入的临时文件前缀,默认没有

hdfs.inUseSuffix

.tmp

Flume正在写入的临时文件后缀

hdfs.emptyInUseSuffix

false

如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀

hdfs.rollInterval

30

当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒

hdfs.rollSize

1024

当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节

hdfs.rollCount

10

当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

hdfs.idleTimeout

0

关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒

hdfs.batchSize

100

向 HDFS 写入内容时每次批量操作的 Event 数量

hdfs.codeC

压缩算法。可选值:gzipbzip2lzolzop` 、 ``snappy

hdfs.fileType

SequenceFile

文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数

hdfs.maxOpenFiles

5000

允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭

hdfs.minBlockReplicas

指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。

hdfs.writeFormat

Writable

文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。

hdfs.threadsPoolSize

10

每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)

hdfs.rollTimerPoolSize

1

每个HDFS Sink实例调度定时文件滚动的线程数

hdfs.kerberosPrincipal

用于安全访问 HDFS 的 Kerberos 用户主体

hdfs.kerberosKeytab

用于安全访问 HDFS 的 Kerberos keytab 文件

hdfs.proxyUser

代理名

hdfs.round

false

是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)

hdfs.roundValue

1

向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30

hdfs.roundUnit

second

向下舍入的单位,可选值: secondminutehour

hdfs.timeZone

Local Time

解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai

hdfs.useLocalTimeStamp

false

使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)

hdfs.closeTries

0

开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。

如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;

如果设置为0,Sink会一直尝试重命名文件直到成功为止;

关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。

hdfs.retryInterval

180

连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。

serializer

TEXT

Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。

serializer.*

根据上面 serializer 配置的类型来根据需要添加序列化器的参数

完整案例如下:

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hmcs030:9092,hmcs031:9092,hmcs032:9092
a1.sources.r1.kafka.topics= hmcs_network_enterprise_climb
a1.sources.r1.kafka.consumer.group.id = hmcs_network_enterprise_climb_group
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hmcs.interceptor.DecodeInterceptor$Builder

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/enterprise/networkEnterData/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = climbNetworkEnter-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStream

a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

启动命令如下:

nohup /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/job/kafka_memory_hdfs.conf -n a1 -Dflume.root.logger=info,console >/usr/local/flume/logs/kafka_memory_hdfs.log 2>&1 &
文章来源:https://blog.csdn.net/SuperBoy_Liang/article/details/135748666
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。