direct关键字发布订阅模式
基本用法
发布者
import json
from rabbitmq import pika
import rabbitmq
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
connection_target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)
exchange_name = "user_manager_direct"
channel = connection.channel()
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)
user = {"id": 1, "name": "张三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(
exchange=exchange_name,
routing_key="error",
body=message.encode('utf8'),
properties=pika.BasicProperties(delivery_mode=2),
)
channel.basic_publish(
exchange=exchange_name,
routing_key="info",
body=message.encode('utf8'),
properties=pika.BasicProperties(delivery_mode=2),
)
print(message)
connection.close()
消费者
import rabbitmq
import json
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)
channel = connection.channel()
exchange_name = "user_manager_direct"
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.direct,
)
result = channel.queue_declare(
queue="",
exclusive=True,
)
queue_name = result.method.queue
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key="error",
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key="info",
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
ch.basic_ack(delivery_tag=method.delivery_tag)
data = body.decode("utf8")
print(json.loads(data))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False,
)
try:
channel.start_consuming()
finally:
connection.close()
简化代码
生产者
import json
from rabbitmq import pika
import rabbitmq
connection = rabbitmq.get_connection()
exchange_name = "user_manager_direct"
channel = connection.channel()
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, "error")
rabbitmq.send_json(channel, user, exchange_name, "info")
connection.close()
消费者
import rabbitmq
import json
connection = rabbitmq.get_connection()
channel = connection.channel()
exchange_name = "user_manager_direct"
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.direct,
)
result = channel.queue_declare(
queue="",
exclusive=True,
)
queue_name = result.method.queue
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key="error",
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key="info",
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
rabbitmq.consume(connection, queue_name, callback)
进一步简化代码
生产者
import rabbitmq
connection = rabbitmq.get_connection()
exchange_name = "user_manager_direct"
channel, _ = rabbitmq.get_direct_channel(connection, exchange_name)
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, "error")
rabbitmq.send_json(channel, user, exchange_name, "info")
connection.close()
消费者
import rabbitmq
import json
connection = rabbitmq.get_connection()
exchange_name = "user_manager_direct"
channel, queue_name = rabbitmq.get_direct_channel(connection, exchange_name, ["error", "info"])
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
rabbitmq.consume(connection, queue_name, callback)