Apache RocketMQ(Rocket Message Queue)是一款开源的分布式消息中间件系统,属于Apache软件基金会的顶级项目之一。RocketMQ最初是由阿里巴巴集团开发并开源的,后来成为Apache软件基金会的孵化项目,最终成为顶级项目。
以下是一些关键特性和概念,以及RocketMQ的一般介绍:
分布式架构: RocketMQ被设计为分布式的消息中间件,可以部署在多个节点上,从而提供高可用性和可伸缩性。
消息模型: RocketMQ支持主题(Topic)和队列(Queue)的概念。消息发送者将消息发送到特定主题,消息接收者则从特定队列中消费消息。
可靠性: RocketMQ提供了可靠的消息传递机制,确保消息不会丢失。它支持同步和异步发送消息,并且在消息发送和消费过程中提供事务支持。
水平扩展: RocketMQ可以通过添加更多的消息代理节点来水平扩展,以满足不断增长的消息负载需求。
持久化: RocketMQ支持将消息持久化到磁盘,以确保即使在重启后,消息也不会丢失。
高性能: RocketMQ被设计为高性能的消息中间件,可以处理大量的消息并实现低延迟。
多语言支持: RocketMQ提供了多语言的客户端,使开发者可以使用Java、C++、Python等多种语言进行消息的发送和接收。
监控和管理: RocketMQ提供了丰富的监控和管理工具,帮助用户监视和管理消息中间件的运行状态。
灵活的部署: RocketMQ支持在云环境和本地环境中进行部署,可以与各种存储系统和计算框架集成。
总体而言,Apache RocketMQ是一个强大而灵活的消息中间件系统,适用于各种场景,包括实时数据处理、日志收集、分布式事务等。它的开源性质和活跃的社区使得开发者可以在其基础上构建可靠的分布式系统。
@@@@@@@更详细原理以及集群部署方案介绍请查看之前的文章@@@@@@@@@
RocketMQ生产者(Producer)用于将消息发送到指定的主题(Topic)。以下是一个更详细的RocketMQ生产者实现的解释:
javaCopy code import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; ? public class RocketMQProducer { ? ? public static void main(String[] args) throws Exception { ? ? ? // 创建生产者实例,指定生产者组名称 ? ? ? DefaultMQProducer producer = new DefaultMQProducer("your_producer_group"); ? ? ? ? // 指定NameServer地址,多个地址以分号分隔 ? ? ? producer.setNamesrvAddr("your_namesrv_address"); ? ? ? ? try { ? ? ? ? ? // 启动生产者实例 ? ? ? ? ? producer.start(); ? ? ? ? ? ? // 创建消息实例,指定主题、标签和消息内容 ? ? ? ? ? Message message = new Message("your_topic", "your_tag", "Hello RocketMQ".getBytes()); ? ? ? ? ? ? // 发送消息,并获取发送结果 ? ? ? ? ? SendResult sendResult = producer.send(message); ? ? ? ? ? ? // 打印发送结果 ? ? ? ? ? System.out.println("SendResult: " + sendResult); ? ? ? } finally { ? ? ? ? ? // 关闭生产者实例 ? ? ? ? ? producer.shutdown(); ? ? ? } ? } }
说明:
创建生产者实例: 使用 DefaultMQProducer
类创建一个RocketMQ生产者实例。指定的参数是生产者组的名称,用于在RocketMQ中唯一标识一个生产者。这个名称在整个RocketMQ集群中必须是唯一的。
javaCopy code DefaultMQProducer producer = new DefaultMQProducer("your_producer_group");
指定NameServer地址: 设置RocketMQ的NameServer地址,用于寻找RocketMQ集群的各个Broker节点。多个地址之间使用分号分隔。
javaCopy code producer.setNamesrvAddr("your_namesrv_address");
启动生产者实例: 在所有配置完成后,调用 start
方法启动生产者实例。
javaCopy code producer.start();
创建消息实例: 使用 Message
类创建要发送的消息。指定主题、标签和消息内容。
javaCopy code Message message = new Message("your_topic", "your_tag", "Hello RocketMQ".getBytes());
发送消息: 使用 send
方法发送消息,并获取发送结果。SendResult
包含了消息发送的一些信息,如消息ID、消息队列等。
javaCopy code SendResult sendResult = producer.send(message);
打印发送结果: 打印发送结果,可以根据需要进行日志记录或其他处理。
javaCopy code System.out.println("SendResult: " + sendResult);
关闭生产者实例: 在程序结束时,或者不再使用生产者实例时,调用 shutdown
方法关闭生产者。
javaCopy code producer.shutdown();
确保在使用RocketMQ之前,你已经在RocketMQ中创建了对应的主题。此外,要保证你的应用代码对异常进行适当的处理,例如捕获 Exception
并进行处理或记录错误信息。
RocketMQ消费者(Consumer)用于从指定的主题消费消息。以下是一个更详细的RocketMQ消费者实现的解释:
javaCopy code import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.common.message.MessageExt; ? import java.util.List; ? public class RocketMQConsumer { ? ? public static void main(String[] args) throws Exception { ? ? ? // 创建消费者实例,指定消费者组名称 ? ? ? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group"); ? ? ? ? // 指定NameServer地址,多个地址以分号分隔 ? ? ? consumer.setNamesrvAddr("your_namesrv_address"); ? ? ? ? try { ? ? ? ? ? // 订阅主题和标签, "*" 表示订阅该主题下所有标签的消息 ? ? ? ? ? consumer.subscribe("your_topic", "*"); ? ? ? ? ? ? // 注册消息监听器 ? ? ? ? ? consumer.registerMessageListener(new MyMessageListener()); ? ? ? ? ? ? // 启动消费者实例 ? ? ? ? ? consumer.start(); ? ? ? ? ? ? // 在应用退出时关闭消费者实例 ? ? ? ? ? Runtime.getRuntime().addShutdownHook(new Thread(consumer::shutdown)); ? ? ? } catch (Exception e) { ? ? ? ? ? e.printStackTrace(); ? ? ? } ? } ? ? // 自定义消息监听器实现 MessageListenerConcurrently 接口 ? static class MyMessageListener implements MessageListenerConcurrently { ? ? ? ? @Override ? ? ? public ConsumeConcurrentlyContext consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { ? ? ? ? ? for (MessageExt msg : msgs) { ? ? ? ? ? ? ? System.out.println("Received message: " + new String(msg.getBody())); ? ? ? ? ? } ? ? ? ? ? return ConsumeConcurrentlyContext.CONSUME_SUCCESS; ? ? ? } ? } }
说明:
创建消费者实例: 使用 DefaultMQPushConsumer
类创建一个RocketMQ消费者实例。指定的参数是消费者组的名称,用于在RocketMQ中唯一标识一个消费者组。这个名称在整个RocketMQ集群中必须是唯一的。
javaCopy code DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
指定NameServer地址: 设置RocketMQ的NameServer地址,用于寻找RocketMQ集群的各个Broker节点。多个地址之间使用分号分隔。
javaCopy code consumer.setNamesrvAddr("your_namesrv_address");
订阅主题和标签: 使用 subscribe
方法订阅要消费的主题和标签。在示例中,"*"
表示订阅该主题下所有标签的消息。
javaCopy code consumer.subscribe("your_topic", "*");
注册消息监听器: 使用 registerMessageListener
方法注册消息监听器。在示例中,我们创建了一个实现了 MessageListenerConcurrently
接口的自定义监听器 MyMessageListener
。
javaCopy code consumer.registerMessageListener(new MyMessageListener());
在 MyMessageListener
中,我们实现了 consumeMessage
方法来处理接收到的消息。
启动消费者实例: 在所有配置完成后,调用 start
方法启动消费者实例。
javaCopy code consumer.start();
关闭消费者实例: 在程序结束时,或者不再使用消费者实例时,调用 shutdown
方法关闭消费者。
javaCopy code consumer.shutdown();
确保在使用RocketMQ之前,你已经在RocketMQ中创建了对应的主题。此外,要保证你的应用代码对异常进行适当的处理,例如捕获 Exception
并进行处理或记录错误信息。在实际应用中,你还可能需要处理消息的业务逻辑,例如数据处理、存储等。
本项目为结合spring boot搭建的rocketmq客户端,因此引用rocketmq-spring-boot-starter的jar包,该jar封装了一些公共实现的代码,使用起来较方便,如果依赖其他架构即可引用rocketmq-client的jar包,使用原始的方式实现消息的生产和消费
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.0</version> </dependency>
服务端为集群部署,Name-server应全部配上,之间用;分隔
rocketmq: name-server: xx.xx.xx.01:9876;xx.xx.xx.02:9876;xx.xx.xx.03:9876; # 访问地址 producer: group: ${spring.application.name} # 必须指定group send-message-timeout: 3000 # 消息发送超时时长,默认3s retry-times-when-send-failed: 2 # 同步发送消息失败重试次数,默认2 retry-times-when-send-async-failed: 2 # 异步发送消息失败重试次数,默认2
1、创建MQConsumeMsgListenerProcessor公共监听类实现MessageListenerConcurrently接口,并实现相应的方法
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { ? ? @Autowired ? private MessageProcessor messageProcessor; ? ? */** ? \* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 ? \* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS ? ** ***@param\*** *msgList \* ? *** ***@param\*** *consumeConcurrentlyContext \* ? *** ***@return \*** ? **/ \* ? @Override ? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { ? ? if (CollectionUtils.isEmpty(msgList)) { ? ? ? log.info("MQ接收消息为空,直接返回成功"); ? ? ? return ConsumeConcurrentlyStatus.*CONSUME_SUCCESS*; ? ? } ? ? MessageExt messageExt = msgList.get(0); ? ? log.info("MQ接收到的消息为:" + messageExt.toString()); ? ? String topic = null; ? ? String tags = null; ? ? String body = null; ? ? try { ? ? ?topic = messageExt.getTopic(); ? ? ?tags = messageExt.getTags(); ? ? ?body = new String(messageExt.getBody(), "utf-8"); ? ? ? log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body); ? ? ? messageProcessor.handle(topic,tags,body); ? ? } catch (Exception e) { ? ? ? log.error("获取MQ消息内容异常{}",e); ? ? } ? ? ? return ConsumeConcurrentlyStatus.*CONSUME_SUCCESS*; ? } }
2、创建各个业务实现监听类,
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "", topic = "") public class SimpleSyncTopic01Test1Listener implements RocketMQListener<MessageExt>, ? ? RocketMQPushConsumerLifecycleListener { ? ? @Value("${project.rocketmq.consumer.groupName1}") ? private String groupName; ? ? @Value("${project.rocketmq.consumer.topics1}") ? private String topic; ? ? @Autowired ? private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor; ? ? @Override ? public void onMessage(MessageExt messageExt) { ? ? } ? ? @Override ? public void prepareStart(DefaultMQPushConsumer consumer) { ? ? consumer.setConsumerGroup(groupName); ? ? try { ? ? ? String[] tagArr = topic.split("~"); ? ? ? if(tagArr.length == 1){ ? ? ? ? consumer.subscribe(tagArr[0],""); ? ? ? ? log.info("consumer 创建成功 groupName={}, topics={}, tag={}, namesrvAddr={}",groupName, ? ? ? ? ? ? tagArr[0],null,consumer.getNamesrvAddr()); ? ? ? }else { ? ? ? ? consumer.subscribe(tagArr[0], tagArr[1]); ? ? ? ? log.info("consumer 创建成功 groupName={}, topics={}, tag={}, namesrvAddr={}",groupName, ? ? ? ? ? ? tagArr[0],tagArr[1],consumer.getNamesrvAddr()); ? ? ? } ? ? ? } catch (MQClientException e) { ? ? ? log.error("consumer 创建失败!"); ? ? } ? ? consumer.registerMessageListener(consumeMsgListenerProcessor); ? ? } }
3、创建业务公共实现类,在该类中分发到各个消费者中实现真正的业务处理
@Service @Slf4j public class MessageProcessorImpl implements MessageProcessor { ? @Override ? public boolean handle(String topic, String tags, String body) { ? ? // 收到的body(消息体),字节类型,需转为String ? ? String result = new String(topic); ? ? log.info("topic{}",topic); ? ? log.info("tags{}",tags); ? ? log.info("body{}",body); ? ? System.out.println("监听到了消息,消息为:"+ result); ? ? // TODO 进行业务逻辑处理 ? ? return true; ? } }
创建消息生产者消息方法,该方法中包括,发送单项消息、同步消息、异步消息、顺序消息、事务消息、带时间间隔发送、带tag的消息等功能
@Slf4j @Component public class MQProducerService { ? ? public void init(Integer mesageTimeOut,String topic,String tag){ ? ? this.messageTimeOut = mesageTimeOut; ? ? this.topic = topic; ? ? this.tag = tag; ? } ? // @Value("${rocketmq.producer.send-message-timeout}") ? private Integer messageTimeOut; ? ? // 建议正常规模项目统一用一个TOPIC ? private String topic; ? ? private String tag; ? ? // 直接注入使用,用于发送消息到broker服务器 ? @Autowired ? private RocketMQTemplate rocketMQTemplate; ? ? */** ? \* 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等) ? \*/ \* ? public <T>void convertAndSend(T entity) { ? ? if(tag == null || "".equals(tag)){ ? ? ? rocketMQTemplate.convertAndSend(topic , entity); ? ? }else{ ? ? ? rocketMQTemplate.convertAndSend(topic + ":"+tag, entity); ? ? } // ? rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行 ? } ? ? */** ? \* 普通发送同步消息 ? ** ***@author\*** *ZH-liujunliang ? ** ***@param\*** *entity \* ? *** ***@param\*** <*T*> ? **/ \* ? public <T>void send(T entity) { ? ? if(tag == null || "".equals(tag)){ ? ? rocketMQTemplate.send(topic, MessageBuilder.*withPayload*(entity).build()); // 等价于上面一行 ? ? }else{ ? ? rocketMQTemplate.send(topic + ":"+tag, MessageBuilder.*withPayload*(entity).build()); // 等价于上面一行 ? ? } ? ? } ? ? */** ? \* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息) ? \* (msgBody也可以是对象,sendResult为返回的发送结果) ? \*/ \* ? public SendResult sendMsg(String msgBody) { ? ? SendResult sendResult = null; ? ? if(tag == null || "".equals(tag)){ ? ? ? sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.*withPayload*(msgBody).build(), messageTimeOut); ? ? }else{ ? ? ? sendResult = rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build(), messageTimeOut); ? ? } ? ? *log*.info("【sendMsg】sendResult={}", JSON.*toJSONString*(sendResult)); ? ? return sendResult; ? } ? ? */** ? \* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) ? \* (适合对响应时间敏感的业务场景) ? \*/ \* ? public void sendAsyncMsg(String msgBody,SendCallback sendCallback) { ? ? if(tag == null || "".equals(tag)){ ? ? ? rocketMQTemplate.asyncSend(topic, MessageBuilder.*withPayload*(msgBody).build(), sendCallback); ? ? }else{ ? ? ? rocketMQTemplate.asyncSend(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build(), sendCallback); ? ? } ? } ? ? */** ? \* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时) ? \* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ? \*/ \* ? public void sendDelayMsg(String msgBody, int delayLevel) { ? ? if(tag == null || "".equals(tag)){ ? ? ? rocketMQTemplate.syncSend(topic, MessageBuilder.*withPayload*(msgBody).build(), messageTimeOut, delayLevel); ? ? }else{ ? ? ? rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build(), messageTimeOut, delayLevel); ? ? } ? } ? ? */** ? \* 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志) ? \*/ \* ? public void sendOneWayMsg(String msgBody) { ? ? if(tag == null || "".equals(tag)){ ? ? ? rocketMQTemplate.sendOneWay(topic, MessageBuilder.*withPayload*(msgBody).build()); ? ? }else{ ? ? ? rocketMQTemplate.sendOneWay(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build()); ? ? } ? } ? ? */** ? \* 发送带tag的消息,直接在topic后面加上":tag" ? \*/ \* ? public SendResult sendTagMsg(String msgBody) { ? ? if(tag == null || "".equals(tag)){ ? ? ? return rocketMQTemplate.syncSend(topic, MessageBuilder.*withPayload*(msgBody).build()); ? ? }else{ ? ? ? return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build()); ? ? } ? } ? ? */** ? \* 发送带tag的消息,直接在topic后面加上":tag" ? \*/ \* ? public SendResult sendsyncSendDeliverTimeMills(String msgBody, int timeMills) { ? ? if(tag == null || "".equals(tag)){ ? ? ? return rocketMQTemplate.syncSendDeliverTimeMills(topic, MessageBuilder.*withPayload*(msgBody).build(),timeMills); ? ? }else{ ? ? ? return rocketMQTemplate.syncSendDeliverTimeMills(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build(),timeMills); ? ? } ? } ? ? */** ? \* 发送带tag的消息,直接在topic后面加上":tag" ? \*/ \* ? public <T>void syncSendOrderly(T body, String orderName) { ? ? if(tag == null || "".equals(tag)){ ? ? ? rocketMQTemplate.syncSendOrderly(topic, body,orderName); ? ? }else{ ? ? ? rocketMQTemplate.syncSendOrderly(topic + ":"+tag, body,orderName); ? ? } ? } ? ? */** ? \* 发送带tag的消息,直接在topic后面加上":tag" ? \*/ \* ? public TransactionSendResult sendMessageInTransaction(String msgBody) { ? ? TransactionSendResult result = null; ? ? if(tag == null || "".equals(tag)){ ? ? ? result = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.*withPayload*(msgBody).build(),null); ? ? }else{ ? ? ? result = rocketMQTemplate.sendMessageInTransaction(topic + ":"+tag, MessageBuilder.*withPayload*(msgBody).build(),null); ? ? } ? ? return result; ? } ? }
@Slf4j @RestController @RequestMapping("/rocketmq") public class TestController { ? @Autowired ? private MQProducerService mqProducerService; ? ? @GetMapping("/send") ? public void send() { ? ? mqProducerService.init(30000,"simple-sync-topic-012","test"); ? ? User user = new User(); ? ? user.setName("12123123"); ? ? user.setPassword("123123"); ? ? mqProducerService.send(user); ? } ? ? @GetMapping("/send1") ? public void send1() { ? ? mqProducerService.init(30000,"simple-sync-topic-01","test1"); ? ? User user = new User(); ? ? user.setName("1231313134545645"); ? ? user.setPassword("456456"); ? ? mqProducerService.send(user); ? } ? ? @GetMapping("/sendTag") ? public SendResult sendTag() { ? ? mqProducerService.init(30000,"simple-sync-topic-03","test"); ? ? SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息"); ? ? return sendResult; ? // ? return null; ? } ? ? ? @GetMapping("/sendOne3") ? public void sendOneWayTag() { ? ? mqProducerService.init(30000,"one-way-topic-01",null); ? ? mqProducerService.sendOneWayMsg("发送单向消息"); ? } ? ? @GetMapping("/sendAsyncMsg") ? public void sendAsyncMsg() { ? ? mqProducerService.init(30000,"simple-async-topic-011","test"); ? ? mqProducerService.sendAsyncMsg("发送异步消息",new SendCallback(){ ? ? ? @Override ? ? ? public void onSuccess(SendResult sendResult) { ? ? ? ? *log*.info("发送消息成功 sendResult = {}",sendResult); ? ? ? } ? ? ? @Override ? ? ? public void onException(Throwable throwable) { ? ? ? ? *log*.info("发送消息失败 errorMessage = {}",throwable.getMessage()); ? ? ? ? throwable.printStackTrace(); ? ? ? ? // 发布消息失败可以进行重试发送,或者自己记录失败数据进行人工补偿。 ? ? ? } ? ? }); ? } ? ? @GetMapping("/syncSendOrderly") ? public void syncSendOrderly() { ? ? mqProducerService.init(30000,"one-order-topic-01",""); ? ? User user = null; ? ? for(int i = 0;i < 10; i++){ ? ? ? user = new User(); ? ? ? user.setName("name"+i); ? ? ? user.setPassword("123123"); ? ? ? mqProducerService.syncSendOrderly(user,"name"); ? ? } ? } ? ? @GetMapping("/sendMessageInTransaction") ? public TransactionSendResult sendMessageInTransaction() { ? ? mqProducerService.init(30000,"transation-topic-01","test"); ? ? TransactionSendResult ts = mqProducerService.sendMessageInTransaction("sfasdfasdfsadfsadf"); ? ? return ts; ? } }