zookeeper,kafka,kafka-ui
注意点:192.168.1.20 是宿主机的ip
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
restart: always
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 0
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.20:9092
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_PORT: 9092
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_HEAP_OPTS: "-Xmx512M -Xmx512M"
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
restart: always
ports:
- 10010:8080
environment:
- DYNAMIC_CONFIG_ENABLED=true
- SERVER_SERVLET_CONTEXT_PATH=/ui-kafka
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
depends_on:
- zookeeper
- kafka
地址:http://localhost:10010/ui-kafka/
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// VALUE: 实际发送消息的内容
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2.创建kafka生产者对象 传递properties属性参数集合
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_test", i + " : testx123测试");
// 4.发送消息
producer.send(record);
System.out.println("发送成功: " + i);
}
// 5.关闭生产者
producer.close();
}
}
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
// bootstrap.servers:kafka服务器地址,多个用逗号隔开
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "topic-test-group"); // 消费组groupId
props.put("auto.offset.reset", "earliest");
// 序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic_test")); // 订阅的topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
if (!records.isEmpty()) {
try {
// 提交消费位移
consumer.commitSync();
} catch (CommitFailedException exception) {
System.out.println("commit failed....");
}
}
}
}
}