docker kafka go demo

发布时间:2024年01月05日

配置

创建网桥

docker network create app-tier --driver bridge

拉取并启动镜像

docker run -d --name kafka-server --hostname kafka-server \
    --network app-tier \
    -p 9092:9092 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092 \
    -e KAFKA_CFG_NODE_ID=0 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    bitnami/kafka:latest

KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092,高亮位置为自己的服务器ip

创建一个first分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092

查看一下分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092

Go生产与消费kafka中的消息

package main

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"os"
	"os/signal"
)

func prod() {
	// 设置 Kafka 代理地址
	brokerList := []string{"192.168.66.1:9092"}

	// 创建一个 Kafka 生产者
	producer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  brokerList,
		Topic:    "first",
		Balancer: &kafka.LeastBytes{},
	})

	// 待发送的消息
	message := kafka.Message{
		Key:   []byte("key"),
		Value: []byte("Hello, Kafka!"),
	}

	// 发送消息
	err := producer.WriteMessages(context.Background(), message)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭 Kafka 生产者
	err = producer.Close()
	if err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println("Message sent successfully!")
}

func main() {

	go prod()
	// 设置 Kafka broker 地址和主题名称
	brokerAddress := "192.168.66.1:9092"
	topic := "first"

	// 创建 Kafka 连接
	conn, err := kafka.DialLeader(context.Background(), "tcp", brokerAddress, topic, 0)
	if err != nil {
		log.Fatalf("Failed to connect to Kafka broker: %s", err)
	}
	defer conn.Close()

	// 设置消费者起始偏移量为最新
	//conn.ResetOffsets()
	
	// 创建消费者
	consumer := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{brokerAddress},
		Topic:     topic,
		Partition: 0,
		MinBytes:  10e3, // 最小字节数
		MaxBytes:  10e6, // 最大字节数
	})

	// 创建一个信号通道,用于捕获中断信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// 开始消费消息
	for {
		select {
		case <-signals:
			log.Println("Received interrupt signal, shutting down...")
			return
		default:
			// 从 Kafka 获取一条消息
			msg, err := consumer.ReadMessage(context.Background())
			if err != nil {
				log.Printf("Failed to read message: %s", err)
				continue
			}

			// 处理消息
			fmt.Printf("Received message: %s\n", string(msg.Value))
		}
	}
}

上图
在这里插入图片描述

Reference
https://hub.docker.com/r/bitnami/kafka

文章来源:https://blog.csdn.net/dawnto/article/details/135398170
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。