创建网桥
docker network create app-tier --driver bridge
拉取并启动镜像
docker run -d --name kafka-server --hostname kafka-server \
--network app-tier \
-p 9092:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1
:9092,高亮位置为自己的服务器ip
创建一个first分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092
查看一下分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
"os"
"os/signal"
)
func prod() {
// 设置 Kafka 代理地址
brokerList := []string{"192.168.66.1:9092"}
// 创建一个 Kafka 生产者
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokerList,
Topic: "first",
Balancer: &kafka.LeastBytes{},
})
// 待发送的消息
message := kafka.Message{
Key: []byte("key"),
Value: []byte("Hello, Kafka!"),
}
// 发送消息
err := producer.WriteMessages(context.Background(), message)
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 关闭 Kafka 生产者
err = producer.Close()
if err != nil {
log.Fatal("failed to close writer:", err)
}
fmt.Println("Message sent successfully!")
}
func main() {
go prod()
// 设置 Kafka broker 地址和主题名称
brokerAddress := "192.168.66.1:9092"
topic := "first"
// 创建 Kafka 连接
conn, err := kafka.DialLeader(context.Background(), "tcp", brokerAddress, topic, 0)
if err != nil {
log.Fatalf("Failed to connect to Kafka broker: %s", err)
}
defer conn.Close()
// 设置消费者起始偏移量为最新
//conn.ResetOffsets()
// 创建消费者
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
Partition: 0,
MinBytes: 10e3, // 最小字节数
MaxBytes: 10e6, // 最大字节数
})
// 创建一个信号通道,用于捕获中断信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// 开始消费消息
for {
select {
case <-signals:
log.Println("Received interrupt signal, shutting down...")
return
default:
// 从 Kafka 获取一条消息
msg, err := consumer.ReadMessage(context.Background())
if err != nil {
log.Printf("Failed to read message: %s", err)
continue
}
// 处理消息
fmt.Printf("Received message: %s\n", string(msg.Value))
}
}
}
上图
Reference
https://hub.docker.com/r/bitnami/kafka