首先,你需要在云服务器上安装Docker和Docker Compose。然后,创建一个新的目录来存放你的Docker Compose配置文件。
在这个目录下创建一个名为docker-compose.yml
的文件,并将以下内容复制到文件中:
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_LISTENERS: PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test_topic:1:1"
这个Docker Compose文件将会启动一个Zookeeper和一个Kafka容器。注意,我们在Kafka容器中设置了一个环境变量来指定Kafka的主机名。
保存并关闭文件后,在终端中使用以下命令启动Kafka和Zookeeper容器:
docker-compose up -d
等待一段时间直到容器启动完成。
接下来,你可以编写使用Kafka的Java示例代码。这里是一个简单的示例:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaExample {
private static final String TOPIC = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 生产者示例
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>(TOPIC, "Hello, Kafka!"));
producer.close();
// 消费者示例
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s\n", record.key(), record.value());
}
consumer.close();
}
}
以上代码包括一个简单的Kafka生产者和一个消费者。它们使用的Kafka主题名称为test_topic
,Kafka的服务器地址为localhost:9092
。
你可以将以上代码保存到一个名为KafkaExample.java
的文件中,并使用以下命令进行编译和执行:
javac -cp kafka-clients-2.6.0.jar KafkaExample.java
java -cp kafka-clients-2.6.0.jar:. KafkaExample
请确保你已经下载并添加了kafka-clients-2.6.0.jar
到你的类路径中,以便在命令中使用。
通过以上步骤,你应该可以在云服务器上使用Docker Compose部署Kafka,并编写Java示例代码来使用Kafka。