Docker搭建kafka集群
kafka概念
- broker:消息中间件处理节点,一个broker就是一个kafka节点,一个或者多个broker就组成了一个kafka集群
- topic:kafka根据topic对消息进行归类,发布到kafka集群的每个消息,都要指定一个topic
- producer:消息生产者,向broker发送消息的客户端
- consumer:消息消费者,从broker读取消息的客户端
kafka特性描述
- 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时,也是通过offset来描述当前要消费的那条消息的位置
消息相关
- 如果多个消费者在同一个消费者组中,那么只有一个消费者可以收到订阅topic中的消息,换言之,同一个消费组中只有一个消费者能收到一个topic中的消息
- 多播消息:不同的消费组订阅同一个topic,不同的消费组中只有一个消费者能收到消息,实际上也是多个消费组中的多个消费者收到了消息
Controller、Rebalance、HW
Controller
- Kafka集群中的broker在zk中创建节点的时候,会有一个临时节点序号,序号最小的节点,会被当做集群的controller,负责管理集群中的所有分区和副本的状态
- 当某个分区的leader副本出现故障,由控制器负责为该分区选举新的leader副本
- 当检测到某个分区的ISR集合发生变化的时候,由控制器负责通知所有的broker更新其元数据信息
- 当使用kafka-topic.sh脚本为某个topic增加分区数量的时候,同样还是由控制器负责让新分区被其它节点感知到
Rebalance
- 前提是消费者没有指定分区进行消费,当消费组中的消费者或者分区关系发生变化的时候,就会触发rebalance机制,这个机制会调整消费者消费哪个分区
- 在触发rebalance机制之前,消费者消费哪个分区有三种策略:
- range:通过公示来计算某个消费者消费哪个分区
- 轮询:所有消费者轮着消费
- sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整
HW和LEO
- LEO是某个副本最后消息的消息位置(log-end-offset)
- HW是已完成同步的位置,消息在写入broker时,且每个broker都完成了这条消息的同步后,hw才会变化,这之前,消费者是消费不到这条消息的,同步完成后,HW调整后,消费者才能消费这条消息,这样做是为了方式消息丢失
kafka消息积压问题
- 消息积压问题的出现:消息的消费者的消费速度远远赶不上生产者生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的消息越来越多,消费者寻址的性能越来越差,最后导致整个kafka对外提供的服务的性能越来越差,从而造成其它服务的访问速度很慢,造成服务雪崩。
- 消息积压的解决方案:
- 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息;
创建多个消费组,多个消费者,部署到其它机器上,一起消费,提高消费者消费消息的速度; - 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者,该消费者poll下来的消息,直接转发到新的主题上,使用多个消费者消费新主题的消息–该方法不常用
Docker 搭建kafka集群
docker search kafka
docker pull bitnami/kafka
docker run -d --name kafka1 --network mynetwork \
-p 9092:9092 \
--env KAFKA_BROKER_ID=0 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.5:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka
docker run -d --name kafka2 --network mynetwork \
-p 9093:9092 \
--env KAFKA_BROKER_ID=1 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.6:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka
docker run -d --name kafka3 --network mynetwork \
-p 9094:9092 \
--env KAFKA_BROKER_ID=2 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.228.7:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka
docker start kafka1
docker start kafka2
docker start kafka3
- springboot引用kafka的生产者和消费者
server:
port: 8080
servlet:
context-path: /
spring:
application:
name: mvcLearn
web:
resources:
static-locations:
- classpath:/hwc/
kafka:
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
producer:
acks: 1
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
package com.huwc.mvclearn.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyKafkaControlller {
private final static String TOPIC_NAME = "my_two_partition_topic";
@Autowired
private KafkaTemplate<String, String> template ;
@GetMapping("/send/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
template.send(TOPIC_NAME, 0, "key", msg);
return "send success" ;
}
}
package com.huwc.mvclearn.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaConsumer {
@KafkaListener(topics = "my_two_partition_topic", groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack){
String key = record.key();
String value = record.value();
System.out.println("key = " + key);
System.out.println("value = " + value);
System.out.println("record = " + record);
ack.acknowledge();
}
}