对于flume的使用,主要就涉及source、channel、sink的配置。内置的可以满足当下我们的需求,这里选用source是 TAILDIR,channel是 SPILLABLEMEMORY,sink是org.apache.flume.sink.kafka.KafkaSink。如果要对数据进行过滤等操作,可以添加配置interceptors,内置的如果无法满足功能,可以自定义interceptor,即实现org.apache.flume.interceptor.Interceptor接口即可,然后把自定义的jar包放置到$FLUME_HOME/lib 下。
实现nginx日志上传kafka的完整flume配置如下:
bx.sources = r1
bx.sinks = k1
bx.channels = c1
bx.sources.r1.type = TAILDIR
bx.sources.r1.channels = c1
bx.sources.r1.positionFile = /opt/software/md_nginx_log/flume/taildir_position.json
bx.sources.r1.filegroups = f1
bx.sources.r1.filegroups.f1 = /data/logs/nginx/access.log
bx.sources.r1.fileHeader = true
bx.sources.r1.maxBatchCount = 1000
bx.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
bx.sinks.k1.channel = c1
bx.sinks.k1.kafka.topic = md_nginx_log
bx.sinks.k1.kafka.bootstrap.servers = xxx:9092
bx.sinks.k1.kafka.flumeBatchSize = 20
bx.sinks.k1.kafka.producer.acks = 1
bx.sinks.k1.kafka.producer.linger.ms = 10
bx.sinks.k1.kafka.producer.compression.type = snappy
bx.channels.c1.type = SPILLABLEMEMORY
bx.channels.c1.memoryCapacity = 100000
bx.channels.c1.overflowCapacity = 100000000
bx.channels.c1.byteCapacity = 500000000
bx.channels.c1.checkpointDir = /opt/software/md_nginx_log/flume/checkpoint
bx.channels.c1.dataDirs = /opt/software/md_nginx_log/flume/data
假如有自定义的interceptor需要应用,名为 com.bg.flume.interceptor.CustomInterceptor,则在配置文件里多添加以下两行:
bx.sources.r1.interceptors = i1
bx.sources.r1.interceptors.i1.type = com.bg.flume.interceptor.CustomInterceptor$Builder
然后启动flume命令如下:
nohup bash $FLUME_HOME/bin/flume-ng agent -c $FLUME_HOME/conf -Xmx2000m -f /opt/software/md_nginx_log/md_nginx_log.properties?-n bx &
这里一定要注意这个 -n(或--name)后面接的值一定是配置文件里的那个前缀值,即bx,写成其他就会导致找不到配置,程序运行起来了但是无法正确运行。
TAILDIR 说明
实时监测配置的文件,一旦检测到每个文件添加了新行,就几乎实时地跟踪它们。如果新行正在写入,该源将重试读取它们,等待写入完成。该源是可靠的,即使文件rotate也不会丢失数据(因为他记录的是文件的inode不是文件名)。它定期将每个文件的最后一个读位置以JSON格式写入给定位置文件。如果Flume由于某种原因停止或停机,它可以从写入现有位置文件的位置重新读取。
配置项说明:
SPILLABLEMEMORY说明
该channel的特点就是先使用内存,内存不足就溢写到磁盘。事件存储在内存中的队列中,也存储在磁盘上。内存队列用作主存储,磁盘用作溢出。磁盘存储使用嵌入式文件通道进行管理。当内存队列满时,其他传入的事件将存储在file channel中。这种通道非常适合那些在正常运行时需要高吞吐量的内存通道,同时又需要更大容量的文件通道以更好地容忍间歇sink端中断或消耗率下降的流量。在这种异常情况下,吞吐量将降低到大约文件信道速度。当代理崩溃或重启时,只有存储在磁盘上的事件才能在代理联机时恢复。即在极端情况下可能是会丢失数据的,这里对于不重要的nginx日志是可行的,性能更好。对于不容许丢失的数据则使用 file channel。
因为nginx日志不会自动切分,所以日志会无限增大,影响读取等性能。这里使用logrotate来实现nginx日志的每日切分。
这里配置nginx切分配置文件,比如ceontos操作系统等,默认在?/etc/logrotate.d/nginx
没有就自己创建该文件,配置内容如下:
/etc/nginx/logs/*.log {
daily
missingok
rotate 3 #保留3个文件
compress
delaycompress
notifempty
sharedscripts
postrotate
if [ -f /var/run/nginx.pid ]; then #注意该pid文件是你具体的nginx的pid文件位置
kill -USR1 `cat /var/run/nginx.pid`
fi
endscript
}