docker间通讯

发布时间:2024年01月15日

注意事项

  1. 不是科班出身,具体里面什么意思不知道,一步一步爬坑爬出来了
  2. 通过kafka实现docker之间的通讯,主要是实现numpy的传递

安装过程

  1. 得安装Zookeeper 和 Kafka,但是kafka2.8版本后直接包含了zookeeper,就省了很大事,我下载的3.4.1 链接
    在这里插入图片描述
    添加链接描述

  2. 安装过程

tar -zxvf 文件名
cd kafka_2.12-3.14/
gedit config/zookeeper.properties
dataDir=~/workspace/redux/kafka/zoodata			#zoodata  自己创建
admin.serverPort=8088
#启动  zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#查看状态,者需要java  jdk,没有则安装前一张取安装
jps
#启动kafka
bin/kafka-server-start.sh  config/server.properties
  1. 两个容器均这么安装,然后其中一个容器彻底不用管,只需要更改另一个容器的参数, 两个 docker 的端口是不一样的,要不然只能执行一个 docker,另一个 docker 的 kafka 启动不了
gedit config/server.properties
pip install kafka-python

在这里插入图片描述
6. 先执行接受端,在执行生产者
7. 生产者

import numpy as np
from kafka import KafkaProducer
import json
# 序列化函数
def serialize_array(array):
    return json.dumps(array.tolist()).encode('utf-8')
# 创建生产者实例
producer = KafkaProducer(bootstrap_servers='baomx:9092',
                         value_serializer=serialize_array)
# 发送 NumPy 数组
array = np.array([1, 2, 3])
producer.send('your-topic', array)
producer.flush()
  1. 接受者
from kafka import KafkaConsumer
import json
import numpy as np

# 反序列化函数
def deserialize_array(message):
    return np.array(json.loads(message.decode('utf-8')))
# 创建消费者实例
consumer = KafkaConsumer('your-topic',
                         bootstrap_servers='baomx:9092',
                         value_deserializer=deserialize_array)
# 接收消息
for message in consumer:
    array = message.value
    print("Received array:", array)
  1. 安装完成
    在这里插入图片描述
  2. 错误1
    原因: 更改了broker.id 然后ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn’t match stored broker.id Some(0) in meta.properties. If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).
    解释: 如果更改了现有 Kafka Broker 的 broker.id,但其数据目录(由 log.dirs 指定)中的 meta.properties 文件仍包含旧的 Broker ID。直接删除 log 文件即可.
cd /tmp
rm -rf kafka-logs
#具体目录在server.properties中
#打开 server.properties 文件,查找 log.dirs 行
#log.dirs=/path/to/kafka-logs
文章来源:https://blog.csdn.net/qq_43385138/article/details/135590355
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。