????????Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。
Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.4.0</version>
</dependency>
????????现在, 我们使用Kafka提供的Producer类,如何发送消息。
代码:
public class MyProducerTest { private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092"; private static final String TOPIC = "disTopic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //PART1:设置发送者相关属性 Properties props = new Properties(); // 此处配置的是kafka的端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor"); // 配置key的序列化类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 配置value的序列化类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i < 5; i++) { //Part2:构建消息 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i); //Part3:发送消息 //单向发送:不关心服务端的应答。 producer.send(record); System.out.println("message "+i+" sended"); } //消息处理完才停止发送者。 producer.close(); } }
执行结果:
代码:
public class MyProducerTest { private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092"; private static final String TOPIC = "disTopic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //PART1:设置发送者相关属性 Properties props = new Properties(); // 此处配置的是kafka的端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor"); // 配置key的序列化类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 配置value的序列化类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i < 5; i++) { //Part2:构建消息 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i); //Part3:发送消息 //同步发送:获取服务端应答消息前,会阻塞当前线程。 RecordMetadata recordMetadata = producer.send(record).get(); String topic = recordMetadata.topic(); int partition = recordMetadata.partition(); long offset = recordMetadata.offset(); String message = recordMetadata.toString(); System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset); } //消息处理完才停止发送者。 producer.close(); } }
执行结果:
代码:
public class MyProducerTest { private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092"; private static final String TOPIC = "disTopic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //PART1:设置发送者相关属性 Properties props = new Properties(); // 此处配置的是kafka的端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor"); // 配置key的序列化类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 配置value的序列化类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); CountDownLatch latch = new CountDownLatch(5); for(int i = 0; i < 5; i++) { //Part2:构建消息 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i); //Part3:发送消息 //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ System.out.println("消息发送失败,"+e.getMessage()); e.printStackTrace(); }else{ String topic = recordMetadata.topic(); long offset = recordMetadata.offset(); String message = recordMetadata.toString(); System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset); } latch.countDown(); } }); } //消息处理完才停止发送者。 latch.await(); //消息处理完才停止发送者。 producer.close(); } }
执行结果:
? 从上述示例中,我们可以总结出,构建Producer分为三个步骤:
????????接下来可以使用Kafka提供的Consumer类,快速消费消息。
代码:
public class MyConsumerTest { private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092"; private static final String TOPIC = "disTopic"; public static void main(String[] args) { //PART1:设置发送者相关属性 Properties props = new Properties(); //kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //每个消费者要指定一个group props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); //key序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //value序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); while (true) { //PART2:拉取消息 // 100毫秒超时时间 ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100)); //PART3:处理消息 for (ConsumerRecord<String, String> record : records) { System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value()); } //提交offset,消息就不会重复推送。 consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。 // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。 } } }
? 整体来说,Consumer同样是分为三个步骤:
内容更新中