Redis发布订阅功能是Redis的一种消息传递模式,允许多个客户端之间通过消息通道进行实时的消息传递。在发布订阅模式下,消息的发送者被称为发布者(publisher),而接收消息的客户端被称为订阅者(subscriber)。
在Redis中,发布者可以将消息发布到一个或多个频道(channel),而订阅者可以选择订阅感兴趣的频道以接收相关的消息。同时,一个订阅者也可以订阅多个频道。当有消息发布到已被订阅的频道时,所有订阅该频道的客户端都能够接收并处理这些消息。
发布订阅功能在实时消息推送、事件通知、即时通讯等场景中具有广泛的应用。在分布式系统中,它也经常被用于解耦消息的发送和接收,实现松耦合的消息通信机制。
通过发布订阅功能,Redis为开发者提供了一种简单而高效的消息传递机制,使得不同的模块或系统之间可以实现解耦、实时通信和消息广播等操作。
安装redis后,在redis.conf内,将bind 127.0.0.1一行注掉,然后重启redis(查找redis-server)
import json
import redis
class RedisChannelManager:
def __init__(self):
# 初始化 Redis 连接和订阅发布功能
# 通过传递这些参数,StrictRedis 类将创建一个与 Redis 服务器的连接
# ,并使用指定的主机地址、端口号和数据库。你可以通过这个连接执行各种 Redis 命令和操作,包括发布和订阅消息。
self.redisClient = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
#方法用于创建一个 Redis 发布/订阅对象。通过这个对象,你可以订阅一个或多个通道,并接收其他客户端发布到这些通道的消息。
self.pubsub = self.redisClient.pubsub()
def subscribe(self,channelName):
# 订阅指定通道
self.pubsub.subscribe(channelName)
self.channelName = channelName
# pubsub.listen() 是 Redis 发布/订阅对象的方法,用于监听订阅的通道,并等待接收消息。
# 当有消息到达时,listen() 方法将会阻塞当前线程,并返回一个生成器对象,通过这个生成器对象可以迭代获取消息。
for item in self.pubsub.listen():
if item['type'] == 'message':
self.processMessage(item['data'])
def processMessage(self, message):
message=message.decode('utf-8')
# 处理收到的消息
print(f"Received message: {message}")
data = json.loads(message)
if data.get('type') == 'pic':
self.handlePicBusiness(data)
def handlePicBusiness(self, data):
# 处理特定业务类型为 pic 的消息
pic_list = data.get('pic_list')
print(pic_list)
# 进行业务操作,比如存储到数据库或者处理图片
#发布报警信息
def handleAlarm(self, data):
self.publishMessage(data)
def publishMessage(self, data,name):
while True:
# 发布消息到指定通道
message = json.dumps(data)
self.redisClient.publish(name, message)
time.sleep(2)
# 创建两个通道管理对象
channel_manager1 = RedisChannelManager()
channel_manager2 = RedisChannelManager()
# 在不同的线程中订阅两个通道
import threading
import time
thread1 = threading.Thread(target=channel_manager1.subscribe,args=("c1",))
# 发布业务类型为 pic 的消息
pic_data = {
'type': 'pic',
'pic_list': ['pic1.jpg', 'pic2.jpg']
}
thread2 = threading.Thread(target=channel_manager1.publishMessage,args=(pic_data,"c1"))
thread2.start()
thread1.start()
thread2.join()
thread1.join()