现有需求需要将elasticsearch的备份至hdfs存储,根据以上需求,使用logstash按照天级别进行数据的同步
./bin/logstash-plugin install logstash-output-webhdfs
input{
elasticsearch{
hosts => "xxxx:9200"
index => "xxxx"
#自定义查询
query => '{"query": {"range": {"create_time":{"gte": 1704668760000,"lte": 1704668820000}}}}'
size => 10000
scroll => "5m"
slices => 1
user => "xxx"
password => "xxxx"
}
}
filter {
date {
#增加@timestamp,并将记录产生时间赋值给@timestamp,时间处理默认是按照@timestamp的时间
match => ["create_time","UNIX_MS"]
timezone => "Asia/Shanghai"
target => "@timestamp"
}
#增加一个timestamp,对@timestamp时间增加8小时
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
#将timestamp赋值给@timestamp
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
#设置导入到hdfs的文件数量,需要增加一个字段,当然也可以用时间来控制文件数量,但是只有固定的几个数字,此处按照3个文件控制
ruby {
code => "event.set('sync_bucket', event.get('created')%3)"
}
#删除上处增加的临时字段timestamp
mutate {
remove_field => ["timestamp"]
}
}
output {
webhdfs {
#高可哟集群需要配置standby
standby_host => "xxx"
standby_port => 9870
host => "xxxx"
port => 9870
path => "/hadoop/test/part_day=%{+YYYYMMdd}/logstash-%{sync_bucket}.log"
#按照时间控制文件生成数量,+a是上下午的意思
#path => "/hadoop/dm_dw/on/ods/ods_cc_es_initLogPro_di/part_day=%{+YYYYMMdd}/logstash-%{+a}.log"
user => "hadoop"
compression => "gzip"
idle_flush_time => 60
codec => "jsonlines"
}
}
logstash时间处理官网:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
比较不错的logstash介绍网站:https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/get_start/full_config.html