提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
RocketMQ之扩展
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
提示:这里可以添加本文要记录的大概内容:
RocketMQ 是一个高性能、高可靠的分布式消息队列中间件,它为企业级应用提供了强大的消息传输和处理能力。在本扩展中,我们将深入探讨 RocketMQ 的高级特性和消息发布的相关内容。
通过本扩展,你将了解到 RocketMQ 的一些高级特性,如事务消息、延迟消息、消息过滤、消息回溯等。这些特性将帮助你更好地构建和管理复杂的消息传递场景,满足不同业务需求。
另外,我们还将详细介绍 RocketMQ 的消息发布机制,包括如何设置生产者属性、选择合适的 Topic、控制消息发送频率以及处理消息失败等。了解这些内容将有助于你高效地利用 RocketMQ 进行消息发布,确保消息的可靠传输和处理。
无论是对于已经熟悉 RocketMQ 的用户,还是刚刚开始接触它的开发者,本扩展都提供了深入了解和掌握 RocketMQ 的机会。通过学习高级特性和消息发布,你将能够更好地利用 RocketMQ 构建可扩展、高性能的分布式应用。
让我们一起探索 RocketMQ 的更多奥秘,提升你的消息处理能力和应用架构水平!
提示:以下是本篇文章正文内容,下面案例可供参考
我们一般通过数据库来实现数据的持久化,但当数据量达到千万后,其IO读写性能较差。而RocketMQ和RabbitMQ通过消息刷盘至虚拟机/物理机的文件来实现数据的持久化(消息刷盘分同步刷盘和异步刷盘),消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。特别注意的是,在性能上,文件系统>关系型数据库DB。
在RocketMQ中,负载均衡主要可以分为Producer发送消息的负载均衡和Consumer订阅消息的负载均衡。
Producer发送消息的负载均衡,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。
Consumer订阅消息的负载均衡,在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。默认的分配算法是AllocateMessageQueueAveragely。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息。
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
传统XA事务方案:性能不足。为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。
基于普通消息方案:一致性保障困难。该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如,消息发送成功,订单没有执行成功,需要回滚整个事务;订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致;消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。
基于RocketMQ分布式事务消息:支持最终一致性。上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。而基于RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
消息有序指的是按照消息的发送顺序来消费(FIFO)。RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的分区队列;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
在MQ(Message Queue)经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常等因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。RocketMQ具有消息重试的机制,重试也分为两种重试Producer重试和Consumer重试。
// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);
当Broker把消息发送给消费者时,由于网络等原因,消费端没有接收到,所以Broker又重新发消息给消费端,在真正的开发中,我么们更应该考虑消费端的重试机制。消费端的重试机制分为顺序消息的重试和无序消息的重试。
当顺序消息发送失败后,Broker会每隔1秒再次向消费者端发送,这时,应用会出现消息消费被阻塞的情况,所以,在我们使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息失败后,可以通过设置返回状态带到消息重试的效果。消息消费失败后,可被消息队列RocketMQ重复投递的最大次数。
重试次数 | 与上次重试的间隔时间 |
---|---|
1 | 10秒 |
2 | 30秒 |
3 | 1分钟 |
4 | 2分钟 |
5 | 3分钟 |
6 | 4分钟 |
7 | 5分钟 |
8 | 6分钟 |
9 | 7分钟 |
10 | 8分钟 |
11 | 9分钟 |
12 | 10分钟 |
13 | 20分钟 |
14 | 30分钟 |
15 | 1小时 |
16 | 2小时 |
注意:需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
延迟消息的本质是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
例如用户下单后,系统生成一个订单,并将订单信息发送到 RocketMQ 延迟消息队列中,设置延迟时间为 10 分钟。RocketMQ 延迟消息队列将订单信息存储起来,并根据延迟时间进行倒计时。当延迟时间到达后,RocketMQ 延迟消息队列会将订单信息投递给短信或邮件服务提供商。短信或邮件服务提供商接收到订单信息后,发送短信或电子邮件给用户,提醒用户订单的状态。
在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。
举个例子:
//返回结果
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0],
queueOffset=0]
其中msgID对应Unique Key,offsetMsgID对应MessageId,而Message Key我们一般保存在数据库。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
当不怎么关心发送结果的时候,可以使用单向发送,例如日志发送
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
public class OrderProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.139.128:9876");
producer.start();
// 获取指定主题的MQ列表
final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");
Message message = null;
MessageQueue messageQueue = null;
for (int i = 0; i < 100; i++) {
// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
// 发送到同一个MQ
messageQueue = messageQueues.get(i % 8);
message = new Message("tp_demo_02", ("hello rocketmq order create - " + i).getBytes());
producer.send(message, messageQueue);
message = new Message("tp_demo_02", ("hello rocketmq order pay - " + i).getBytes());
producer.send(message, messageQueue);
message = new Message("tp_demo_02", ("hello rocketmq order delivery - " + i).getBytes());
producer.send(message, messageQueue);
}
producer.shutdown();
}
}
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.139.128:9876");
consumer.subscribe("tp_demo_02", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 使用有序消息监听器
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t" +
msg.getQueueId() + "\t" +
new String(msg.getBody())
);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.139.128:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_02", ("global ordered message..." + i).getBytes());
producer.send(message,new MessageQueueSelector() {
@Override
// select方法第一个参数: 指该Topic下有的队列集合
// 第二个参数: 发送的消息
// 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg);
}
}, 1);
}
producer.shutdown();
}
}
public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.139.128:9876");
consumer.subscribe("tp_demo_02", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
public class MyDelayMsgProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_10_01");
producer.setNamesrvAddr("106.75.190.206:9876");
producer.start();
Message message = null;
for (int i = 0; i < 20; i++) {
message = new Message("tp_demo_10", ("hello rocketmq delayMessage - " + i).getBytes());
// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
public class MyDelayMsgConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_10_01");
consumer.setNamesrvAddr("106.75.190.206:9876");
// 设置消息重试次数
consumer.setMaxReconsumeTimes(5);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.subscribe("tp_demo_10", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(System.currentTimeMillis() / 1000);
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t"
+ msg.getQueueId() + "\t"
+ msg.getMsgId() + "\t"
+ msg.getDelayTimeLevel() + "\t"
+ new String(msg.getBody())
);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return LocalTransactionState.ROLLBACK_MESSAGE;
// return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
// 设置事务的监听器
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_12", "hello rocketmq - tx - 02".getBytes());
// 发送事务消息
producer.sendMessageInTransaction(message, "{\"name\":\"zhangsan\"}");
}
}
public class TxConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_12", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
public class QueryingMessageDemo {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.139.128:9876");
//设置消息监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//根据messageId查询消息
MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
System.out.println(message);
System.out.println(message.getMsgId());
consumer.shutdown();
}
}
提示:这里对文章进行总结:
以上就是今天要讲的内容,有什么疑问可以在评论区探讨,大家一起学习与进步!!!