Java小案例-RocketMQ的11种消息类型,你知道几种?(请求应答消息)

发布时间:2023年12月17日

前言

Rocket的请求应答消息是指在使用Rocket(这里可能是RocketMQ或者Rocket框架)进行通信时,客户端发送一个请求到服务端,然后服务端处理该请求并返回一个响应的过程中的数据交换。

在RocketMQ中:

请求应答消息通常涉及到以下几个步骤:

  1. 生产者(Producer)创建一个消息,并将其发送到Broker(消息中间件服务器)。
  2. Broker接收到消息后,可能需要进行存储、路由或者其他处理操作。
  3. 如果请求是需要立即响应的(例如RPC调用),Broker会在处理完消息后生成一个响应消息,并通过网络返回给生产者。
  4. 生产者接收到响应消息后,可以根据响应内容进行相应的业务处理。

在Rocket框架中:

请求应答消息通常涉及到HTTP请求和响应:

  1. 客户端(通常是Web浏览器或者API客户端)向Rocket应用服务器发送一个HTTP请求,请求可能包含JSON、XML或者其他格式的数据。
  2. Rocket框架接收到请求后,根据路由规则将请求分发到对应的处理器函数(handler)。
  3. 处理器函数处理请求,这可能包括查询数据库、计算结果或者其他业务逻辑。
  4. 处理完成后,处理器函数构建一个HTTP响应,响应中包含处理结果以及可能的状态码、头部信息等。
  5. Rocket框架将响应返回给客户端,客户端解析响应并进行相应的处理。

无论是RocketMQ还是Rocket框架,请求应答消息都是系统间或者组件间通信的基本机制,用于实现功能调用、数据交换或者状态同步。

请求应答消息

这个消息类型比较有意思,类似一种RPC的模式

生产者发送消息之后可以阻塞等待消费者消费这个消息的之后返回的结果

生产者通过过调用request方法发送消息,接收回复消息

public?class?Producer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//创建一个生产者,指定生产者组为?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?启动生产者
????????producer.start();


????????Message?message?=?new?Message("sanyouTopic",?"三友的java日记".getBytes());
????????
????????//发送消息,拿到响应结果,?3000代表超时时间,3s内未拿到响应结果,就超时,会抛出RequestTimeoutException异常
????????Message?result?=?producer.request(message,?3000);
????????System.out.println("接收到响应消息:"?+?result);

????????//?关闭生产者
????????producer.shutdown();
????}

}

而对于消费者来着,当消费完消息之后,也要作为生产者,将响应的消息发送出去

public?class?Consumer?{
????public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{

????????//创建一个生产者,指定生产者组为?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?启动生产者
????????producer.start();


????????//?通过push模式消费消息,指定消费者组
????????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("sanyouConsumer");
????????//?指定NameServer的地址
????????consumer.setNamesrvAddr("192.168.200.143:9876");

????????//?订阅这个topic下的所有的消息
????????consumer.subscribe("sanyouTopic",?"*");
????????//?注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
????????consumer.registerMessageListener(new?MessageListenerConcurrently()?{

????????????@Override
????????????public?ConsumeConcurrentlyStatus?consumeMessage(List<MessageExt>?msgs,
????????????????????????????????????????????????????????????ConsumeConcurrentlyContext?context)?{
????????????????for?(MessageExt?msg?:?msgs)?{
????????????????????System.out.printf("消费消息:%s",?new?String(msg.getBody())?+?"\n");

????????????????????try?{
????????????????????????//?用RocketMQ自带的工具类创建响应消息
????????????????????????Message?replyMessage?=?MessageUtil.createReplyMessage(msg,?"这是响应消息内容".getBytes(StandardCharsets.UTF_8));
????????????????????????//?将响应消息发送出去,拿到发送结果
????????????????????????SendResult?replyResult?=?producer.send(replyMessage,?3000);
????????????????????????System.out.println("响应消息的结果?=?"?+?replyResult);
????????????????????}?catch?(Exception?e)?{
????????????????????????e.printStackTrace();
????????????????????}

????????????????}

????????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????}
????????});

????????//?启动消费者
????????consumer.start();

????????System.out.printf("Consumer?Started.%n");
????}
}

这种请求-应答消息实现原理也比较简单,如下图所示

生产者和消费者,会跟RocketMQ服务端进行网络连接

所以他们都是通过这个连接来发送和拉取消息的

当服务端接收到回复消息之后,有个专门处理回复消息的类

这个类就会直接找到发送消息的生产者的连接,之后会通过这个连接将回复消息发送给生产者

RocketMQ底层是基于Netty通信的,所以如果你有用过Netty的话,应该都知道,就是通过Channel来发送的

联系方式

关于文章中大家有任何疑问可以通过关注公众号《编程乐学》进行留言,同时,公众号还有更多有趣的项目以及关于学习编程的笔记资料大家可以看看,欢迎大家进行留言。

文章来源:https://blog.csdn.net/weixin_54449574/article/details/135052281
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。