? ? ? ? 导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。
????????Apache Flink?是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。
?----?Apache Flink 官方文档?
// 创建Flink流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置数据源,这里假设是某个文件
DataStream<String> text = env.readTextFile("path/to/text");
// 定义数据处理操作
DataStream<String> processed = text
.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
// 实现一些转换逻辑
return "Processed: " + value;
}
});
// 执行数据流
env.execute("Flink DataStream Example");
????????Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。
?---- 维基百科?
????????在Kafka中,生产者(producer)将消息发送给Broker,Broker将生产者发送的消息存储到磁盘当中,而消费者(Consumer)负责从Broker订阅并且消费消息,消费者(Consumer)使用pull这种模式从服务端拉取消息。而zookeeper是负责整个集群的元数据管理与控制器的选举。
// Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "message key", "message value"));
// Kafka消费者
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
????????在开始之前,确保你的开发环境中安装了Apache Flink和Apache Kafka。Flink提供了与Kafka集成的连接器,可以轻松地从Kafka读取数据并将数据写回Kafka。
要使Flink应用能够从Kafka消费数据,需要使用Flink提供的Kafka连接器。
创建一个Flink应用程序,从名为"topic-name"的Kafka主题中消费数据,并打印出来。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"topic-name", new SimpleStringSchema(), properties);
// 将消费者添加到数据流
DataStream<String> stream = env.addSource(consumer);
stream.print();
env.execute("Flink Kafka Integration");
}
}
一旦从Kafka接收数据流,可以利用Flink提供的各种操作对数据进行处理。
我们对从Kafka接收到的每条消息进行了简单的处理,并输出处理后的结果。
DataStream<String> processedStream = stream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
return "Processed: " + value;
}
});
processedStream.print();
除了从Kafka消费数据外,Flink还可以将处理后的数据流发送回Kafka。我们可以创建一个Flink生产者实例,并将处理后的数据流发送到名为"output-topic"的Kafka主题。
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
// ...
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
"output-topic", new SimpleStringSchema(), properties);
processedStream.addSink(producer);
env.setParallelism(4);
env.enableCheckpointing(10000); // 每10000毫秒进行一次checkpoint
----------------
觉得有用欢迎点赞收藏~ 欢迎评论区交流~