tar -zxvf 文件名
cd kafka_2.12-3.14/
gedit config/zookeeper.properties
dataDir=~/workspace/redux/kafka/zoodata #zoodata 自己创建
admin.serverPort=8088
#启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#查看状态,者需要java jdk,没有则安装前一张取安装
jps
#启动kafka
bin/kafka-server-start.sh config/server.properties
gedit config/server.properties
pip install kafka-python
6. 先执行接受端,在执行生产者
7. 生产者
import numpy as np
from kafka import KafkaProducer
import json
# 序列化函数
def serialize_array(array):
return json.dumps(array.tolist()).encode('utf-8')
# 创建生产者实例
producer = KafkaProducer(bootstrap_servers='baomx:9092',
value_serializer=serialize_array)
# 发送 NumPy 数组
array = np.array([1, 2, 3])
producer.send('your-topic', array)
producer.flush()
from kafka import KafkaConsumer
import json
import numpy as np
# 反序列化函数
def deserialize_array(message):
return np.array(json.loads(message.decode('utf-8')))
# 创建消费者实例
consumer = KafkaConsumer('your-topic',
bootstrap_servers='baomx:9092',
value_deserializer=deserialize_array)
# 接收消息
for message in consumer:
array = message.value
print("Received array:", array)
cd /tmp
rm -rf kafka-logs
#具体目录在server.properties中
#打开 server.properties 文件,查找 log.dirs 行
#log.dirs=/path/to/kafka-logs