Docker搭建kafka集群

发布时间:2023年12月29日

Docker搭建kafka集群

kafka概念

  • broker:消息中间件处理节点,一个broker就是一个kafka节点,一个或者多个broker就组成了一个kafka集群
  • topic:kafka根据topic对消息进行归类,发布到kafka集群的每个消息,都要指定一个topic
  • producer:消息生产者,向broker发送消息的客户端
  • consumer:消息消费者,从broker读取消息的客户端

kafka特性描述

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时,也是通过offset来描述当前要消费的那条消息的位置

消息相关

  • 如果多个消费者在同一个消费者组中,那么只有一个消费者可以收到订阅topic中的消息,换言之,同一个消费组中只有一个消费者能收到一个topic中的消息
  • 多播消息:不同的消费组订阅同一个topic,不同的消费组中只有一个消费者能收到消息,实际上也是多个消费组中的多个消费者收到了消息

Controller、Rebalance、HW

Controller

  • Kafka集群中的broker在zk中创建节点的时候,会有一个临时节点序号,序号最小的节点,会被当做集群的controller,负责管理集群中的所有分区和副本的状态
  • 当某个分区的leader副本出现故障,由控制器负责为该分区选举新的leader副本
  • 当检测到某个分区的ISR集合发生变化的时候,由控制器负责通知所有的broker更新其元数据信息
  • 当使用kafka-topic.sh脚本为某个topic增加分区数量的时候,同样还是由控制器负责让新分区被其它节点感知到

Rebalance

  • 前提是消费者没有指定分区进行消费,当消费组中的消费者或者分区关系发生变化的时候,就会触发rebalance机制,这个机制会调整消费者消费哪个分区
  • 在触发rebalance机制之前,消费者消费哪个分区有三种策略:
    • range:通过公示来计算某个消费者消费哪个分区
    • 轮询:所有消费者轮着消费
    • sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整

HW和LEO

  • LEO是某个副本最后消息的消息位置(log-end-offset)
  • HW是已完成同步的位置,消息在写入broker时,且每个broker都完成了这条消息的同步后,hw才会变化,这之前,消费者是消费不到这条消息的,同步完成后,HW调整后,消费者才能消费这条消息,这样做是为了方式消息丢失

kafka消息积压问题

  • 消息积压问题的出现:消息的消费者的消费速度远远赶不上生产者生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的消息越来越多,消费者寻址的性能越来越差,最后导致整个kafka对外提供的服务的性能越来越差,从而造成其它服务的访问速度很慢,造成服务雪崩。
  • 消息积压的解决方案:
    • 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息;
      创建多个消费组,多个消费者,部署到其它机器上,一起消费,提高消费者消费消息的速度;
    • 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者,该消费者poll下来的消息,直接转发到新的主题上,使用多个消费者消费新主题的消息–该方法不常用

Docker 搭建kafka集群

  • docker下载kafka镜像
docker search kafka
docker pull bitnami/kafka
  • 启动3个kafka节点,组成集群
docker run -d --name kafka1 --network mynetwork \
 -p 9092:9092 \
 --env KAFKA_BROKER_ID=0 \
 --env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.5:9092 \
 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka 
 
 docker run -d --name kafka2 --network mynetwork \
 -p 9093:9092 \
 --env KAFKA_BROKER_ID=1 \
 --env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.6:9092 \
 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka 
 
 docker run -d --name kafka3 --network mynetwork \
 -p 9094:9092 \
 --env KAFKA_BROKER_ID=2 \
 --env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.7:9092 \
 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka 
  • 启动kafka集群节点
docker start kafka1
docker start kafka2
docker start kafka3
  • springboot引用kafka的生产者和消费者
#springboot中kafka的配置信息
server:
  port: 8080
  servlet:
    context-path: /
spring:
  application:
    name: mvcLearn
#  mvc:
#    static-path-pattern: /static/**
  web:
    resources:
      static-locations:
        - classpath:/hwc/
  kafka:
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    producer:
      acks: 1
      retries: 3
      batch-size: 16384
      buffer-memory:  33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual_immediate
//Kafka生产者controller,接收前段消息,发送至kafka集群
package com.huwc.mvclearn.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MyKafkaControlller {

    private final static String TOPIC_NAME = "my_two_partition_topic";

    @Autowired
    private KafkaTemplate<String, String> template ;


    @GetMapping("/send/{msg}")
    public String sendMessage(@PathVariable("msg") String msg){
        template.send(TOPIC_NAME, 0, "key", msg);

        return "send success" ;
    }
}


/**
*kafka消费者,使用@KafkaListener注解注册一个消费者
*
**/
package com.huwc.mvclearn.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaConsumer {

    @KafkaListener(topics = "my_two_partition_topic", groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack){
        String key = record.key();
        String value = record.value();

        System.out.println("key = " + key);
        System.out.println("value = " + value);
        System.out.println("record = " + record);

        ack.acknowledge();
    }
}



文章来源:https://blog.csdn.net/whhwch1986/article/details/135298434
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。