? ? ? ? Flume 自带 Source 有 Avro、Thrift、Netcat、Taildir、Kafka、Http等,有些场合比如我们指定访问接口获取数据当做 Flume 的 Source,像这种定制化的 Source 需要我们自己实现,下面我将介绍如何自定义实现 Source。
? ? ? ? 使用自定义 Source ,访问自身的web 服务,并且发送至 logger 的 Sink。
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String path;
private final static Logger log = LoggerFactory.getLogger(MySource.class);
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try{
URL url = new URL(this.path);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
InputStream is = connection.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is, "utf-8"));
StringBuffer stringBuffer = new StringBuffer();
String line = null;
while ((line = bufferedReader.readLine()) != null){
stringBuffer.append(line);
stringBuffer.append("\t");
}
log.info("test =======================: {}" , stringBuffer.toString());
Event event = EventBuilder.withBody(stringBuffer.toString().getBytes());
getChannelProcessor().processEvent(event);
status = Status.READY;
// 2s调用一次
Thread.sleep(2000);
}catch (Exception ex){
log.info("出错了!, {}", ex.getMessage());
status = Status.BACKOFF;
}
return status;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public void configure(Context context) {
// 从配置文件中读取
String path = context.getString("path", "http://baidu.com");
log.info("path ==========================: {}", path);
this.path = path;
}
}
? ? ? ? vim flume-self-source.conf
a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource # 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.33:8088/hello # 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sinks.k1.type = logger
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source.conf -Dflume.root.logger=INFO,console
?
? ? ? ? 本文详细介绍 Flume 如何实现自定义 Source,帮助大家进一步了解Flume的使用。
????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)