fanout发布订阅模式
基本用法
生产者
import json
import rabbitmq
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
connection_target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel = connection.channel()
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
user = {"id": 1, "name": "张三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(
exchange=exchange_name,
routing_key=queue_name,
body=message.encode('utf8'),
properties=rabbitmq.BasicProperties(delivery_mode=2),
)
print(message)
connection.close()
消费者
import rabbitmq
import json
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)
channel = connection.channel()
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
ch.basic_ack(delivery_tag=method.delivery_tag)
data = body.decode("utf8")
print(json.loads(data))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False,
)
try:
channel.start_consuming()
finally:
connection.close()
简化代码
生产者
import rabbitmq
connection = rabbitmq.get_connection()
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel = connection.channel()
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
connection.close()
消费者
import rabbitmq
import json
connection = rabbitmq.get_connection()
channel = connection.channel()
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
rabbitmq.consume(connection, queue_name, callback)
进一步简化代码
生产者
import rabbitmq
connection = rabbitmq.get_connection()
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel = rabbitmq.get_fanout_channel(connection, exchange_name)
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
connection.close()
消费者
import rabbitmq
connection = rabbitmq.get_connection()
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
channel = rabbitmq.get_fanout_channel(connection, exchange_name, queue_name)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
rabbitmq.consume(connection, queue_name, callback)