springboot集成kafka消费数据
1.引入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
2.添加配置文件
2.1.添加KafkaConsumerConfig.java
@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {
@Autowired
KafkaIotCustomProperties kafkaIotCustomProperties;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> consumerConfigs = consumerConfigs();
log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));
return new DefaultKafkaConsumerFactory<>(consumerConfigs);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());
propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());
return propsMap;
}
}
2.2.添加KafkaIotCustomProperties.java
@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {
private List<String> topics;
private String groupId;
private String sessionTimeOut;
private String bootstrapServers;
private String autoOffsetReset;
private boolean enableAutoCommit;
private String autoCommitInterval;
private String fetchMinSize;
private String fetchMaxWait;
private String maxPollRecords;
private String maxPollInterval;
private String heartbeatInterval;
private String keyDeserializer;
private String valueDeserializer;
}
2.3.添加application.yml配置
fxyh:
realdata:
kafka:
bootstrapServers: 192.168.80.251:9092
topics: ["test1","test2"]
groupId: shengtingrealdatagroup
sessionTimeOut: 30000
enableAutoCommit: false
autoCommitInterval: 1000
fetchMinSize: 1
fetchMaxWait: 500
maxPollRecords: 50
maxPollInterval: 300000
heartbeatInterval: 10000
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
3.消费者代码
@Slf4j
@Component
public class DeviceDataConsumer {
@Autowired
private KafkaIotCustomProperties kafkaIotCustomProperties;
@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})
public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
for (ConsumerRecord<String, String> record : records) {
log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());
ack.acknowledge();
}
}
}