springboot集成kafka消费数据

发布时间:2024年01月15日

springboot集成kafka消费数据

1.引入pom依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.2</version>
        </dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {

    @Autowired
    KafkaIotCustomProperties kafkaIotCustomProperties;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 并发数 多个微服务实例会均分
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        ContainerProperties containerProperties = factory.getContainerProperties();
        // 是否设置手动提交
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfigs = consumerConfigs();
        log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // 服务器地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());
        // 是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());
        // 自动提交间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());
        //会话时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());
        //key序列化
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());
        //value序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());
        // 心跳时间
        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());

        // 分组id
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());
        //消费策略
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());
        // poll记录数
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());
        //poll时间
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());
        return propsMap;
    }

}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {

    private List<String> topics;

    private String groupId;

    private String sessionTimeOut;

    private String bootstrapServers;

    private String autoOffsetReset;

    private boolean enableAutoCommit;

    private String autoCommitInterval;

    private String fetchMinSize;

    private String fetchMaxWait;

    private String maxPollRecords;

    private String maxPollInterval;

    private String heartbeatInterval;

    private String keyDeserializer;

    private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:
  realdata:
    kafka:
      bootstrapServers:  192.168.80.251:9092
      topics: ["test1","test2"]
      groupId: shengtingrealdatagroup
      #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
      sessionTimeOut: 30000
      #      autoOffsetReset: earliest
      #取消自动提交,即便如此 spring会帮助我们自动提交
      enableAutoCommit: false
      #自动提交间隔
      autoCommitInterval: 1000
      #拉取的最小字节
      fetchMinSize: 1
      #拉去最小字节的最大等待时间
      fetchMaxWait: 500
      maxPollRecords: 50
      #300秒的提交间隔,如果程序大于300秒提交,会报错
      maxPollInterval: 300000
      #心跳间隔
      heartbeatInterval: 10000
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {


    @Autowired
    private KafkaIotCustomProperties kafkaIotCustomProperties;


    @KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})
    public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());
            //手动提交偏移量
            ack.acknowledge();

        }
    }
}
文章来源:https://blog.csdn.net/weixin_47874230/article/details/135589570
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。