docker-compose部署kafka

发布时间:2024年01月11日

一.基础kafka部署

zookeeper,kafka,kafka-ui

docker-compose.yml

注意点:192.168.1.20 是宿主机的ip

version: "3"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: wurstmeister/kafka
    restart: always
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.20:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_PORT: 9092 
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_HEAP_OPTS: "-Xmx512M -Xmx512M"
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    restart: always
    ports:
        - 10010:8080
    environment:
        - DYNAMIC_CONFIG_ENABLED=true
        - SERVER_SERVLET_CONTEXT_PATH=/ui-kafka
        - KAFKA_CLUSTERS_0_NAME=local
        - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
        - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
    depends_on:
      - zookeeper
      - kafka

kafka-ui

地址:http://localhost:10010/ui-kafka/

java生产者

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>



import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args) {


        Properties properties = new Properties();
 

 
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  
        //	KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //	VALUE: 实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //	2.创建kafka生产者对象 传递properties属性参数集合
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_test", i + " : testx123测试");
            //	4.发送消息
            producer.send(record);
            System.out.println("发送成功: " + i);
        }
        //	5.关闭生产者
        producer.close();

    }
}

java消费者

public class KafkaConsumerTest {

    public static void main(String[] args) {


        Properties props = new Properties();
        // bootstrap.servers:kafka服务器地址,多个用逗号隔开
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "topic-test-group"); // 消费组groupId
        props.put("auto.offset.reset", "earliest");
        // 序列化方式
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("topic_test")); // 订阅的topic
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            if (!records.isEmpty()) {
                try {
                    // 提交消费位移
                    consumer.commitSync();
                } catch (CommitFailedException exception) {
                    System.out.println("commit failed....");
                }
            }

        }
    }
}

二.加密部署

文章来源:https://blog.csdn.net/xixiyuguang/article/details/135518162
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。