该项目介绍了springboot如何集成rabbitMQ消息中间件,实现(直连模式\路由模式\广播模式\主题模式)的消息发送和接收
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
配置
yaml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: ${RABBITMQ_USERNAME:guest}
password: ${RABBITMQ_PASSWORD:guest}
virtual-host: /
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
Java配置类
package com.ukayunnuo.config;
import com.ukayunnuo.constants.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(Boolean.TRUE);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("MQ Message sent successfully! correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause));
rabbitTemplate.setReturnsCallback((message) -> log.warn("MQ Message loss! message:{}", message));
return rabbitTemplate;
}
@Bean
public Queue directRoutingDemoQueue1() {
return new Queue(RabbitConstants.Queue.ROUTING_MODE_QUEUE_DEMO_1);
}
@Bean
public Queue directRoutingDemoQueue2() {
return new Queue(RabbitConstants.Queue.ROUTING_MODE_QUEUE_DEMO_2);
}
@Bean
public DirectExchange directDemoExchange() {
return new DirectExchange(RabbitConstants.Exchange.DIRECT_MODE_EXCHANGE_DEMO);
}
@Bean
public Binding directFanoutDemoBinding1(Queue directRoutingDemoQueue1, DirectExchange directDemoExchange) {
return BindingBuilder.bind(directRoutingDemoQueue1).to(directDemoExchange).with(RabbitConstants.RoutingKey.DIRECT_ROUTING_KEY_DEMO);
}
@Bean
public Binding directFanoutDemoBinding2(Queue directRoutingDemoQueue2, DirectExchange directDemoExchange) {
return BindingBuilder.bind(directRoutingDemoQueue2).to(directDemoExchange).with(RabbitConstants.RoutingKey.DIRECT_ROUTING_KEY_DEMO);
}
@Bean
public Queue fanoutDemoQueue1() {
return new Queue(RabbitConstants.Queue.FANOUT_MODE_QUEUE_DEMO_1);
}
@Bean
public Queue fanoutDemoQueue2() {
return new Queue(RabbitConstants.Queue.FANOUT_MODE_QUEUE_DEMO_2);
}
@Bean
public FanoutExchange fanoutDemoExchange() {
return new FanoutExchange(RabbitConstants.Exchange.FANOUT_MODE_EXCHANGE_DEMO);
}
@Bean
public Binding fanoutDemoBinding1(Queue fanoutDemoQueue1, FanoutExchange fanoutDemoExchange) {
return BindingBuilder.bind(fanoutDemoQueue1).to(fanoutDemoExchange);
}
@Bean
public Binding fanoutDemoBinding2(Queue fanoutDemoQueue2, FanoutExchange fanoutDemoExchange) {
return BindingBuilder.bind(fanoutDemoQueue2).to(fanoutDemoExchange);
}
@Bean
public Queue topicDemoQueue1() {
return new Queue(RabbitConstants.Queue.TOPIC_MODE_QUEUE_DEMO_1);
}
@Bean
public Queue topicDemoQueue2() {
return new Queue(RabbitConstants.Queue.TOPIC_MODE_QUEUE_DEMO_2);
}
@Bean
public TopicExchange topicDemoExchange() {
return new TopicExchange(RabbitConstants.Exchange.TOPIC_MODE_EXCHANGE_DEMO);
}
@Bean
public Binding topicDemoBinding1(Queue topicDemoQueue1, TopicExchange topicDemoExchange) {
return BindingBuilder.bind(topicDemoQueue1).to(topicDemoExchange).with(RabbitConstants.RoutingKey.TOPIC_ROUTING_KEY_DEMO_SINGLE);
}
@Bean
public Binding topicDemoBinding2(Queue topicDemoQueue2, TopicExchange topicDemoExchange) {
return BindingBuilder.bind(topicDemoQueue2).to(topicDemoExchange).with(RabbitConstants.RoutingKey.TOPIC_ROUTING_KEY_DEMO_ALL);
}
}
生产者
@RestController
@RequestMapping("/demo/rabbitmq/produce")
public class RabbitMqProduceController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendMq")
public Result<Boolean> sendMq(@RequestBody MqReq req) {
String routingKey = StrUtil.blankToDefault(req.getRoutingKey(), req.getQueue());
if (StrUtil.isBlank(req.getExchange())) {
rabbitTemplate.convertAndSend(routingKey, MqMsgStruct.builder().msg(req.getMsg()).build());
} else {
rabbitTemplate.convertAndSend(req.getExchange(), routingKey, MqMsgStruct.builder().msg(req.getMsg()).build());
}
return Result.success(Boolean.TRUE);
}
}
HTTP测试
### 发送mq消息 direct 简单模式 -> 端对端
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"queue": "direct.queue.demo",
"msg": "direct simple test msg info"
}
### 发送mq消息 direct路由模式 -> 指定routingKey 示例:{direct.routing.demo}, 注意:如果传的是routingKey, 必须传exchange
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"exchange": "direct.exchange.demo",
"routingKey": "direct.routing.demo",
"msg": "direct routing test msg info"
}
### 发送mq消息 direct路由模式 -> 指定队列 示例:{routing.queue.demo1, routing.queue.demo2}
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"exchange": "direct.exchange.demo",
"routingKey": "routing.queue.demo1",
"msg": "routing test msg info"
}
### 发送mq消息 fanout 模式 -> 广播模式
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"exchange": "fanout.exchange.demo",
"msg": "fanout test fanout msg info -> all"
}
### 发送mq消息 topic 模式 匹配 topic.queue.demo.# 和 topic.queue.*
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"exchange": "topic.exchange.demo",
"routingKey": "topic.queue.demo.yunnuo",
"msg": "topic test msg info"
}
### 发送mq消息 topic 模式 匹配 topic.queue.*
POST http://localhost:8084/demo/rabbitmq/produce/sendMq
Content-Type: application/json
{
"exchange": "topic.exchange.demo",
"routingKey": "topic.queue.demo2.yunnuo",
"msg": "topic test msg info"
}
### 发送mq消息 Delay 模式
POST http://localhost:8084/demo/rabbitmq/produce/sendDelayMq
Content-Type: application/json
{
"exchange": "delay.exchange.demo",
"routingKey": "delay.queue.demo",
"msg": "Delay test msg info"
}
消费者
直连模式(Direct)
直连模式-路由(Routing)
@Slf4j
@Component
public class DirectRoutingModeDemoConsumer {
@RabbitListener(queues = {RabbitConstants.Queue.ROUTING_MODE_QUEUE_DEMO_1})
public void handle1(MqMsgStruct msg, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("DirectRoutingModeDemoConsumer handle1 start --> queue:{}, msg:{}, deliveryTag:{}", message.getMessageProperties().getConsumerQueue(), msg, deliveryTag);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel, deliveryTag);
}
@RabbitListener(queues = {RabbitConstants.Queue.ROUTING_MODE_QUEUE_DEMO_2})
public void handle2(MqMsgStruct msg, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("DirectRoutingModeDemoConsumer handle2 start --> queue:{}, msg:{}, deliveryTag:{}", message.getMessageProperties().getConsumerQueue(), msg, deliveryTag);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel, deliveryTag);
}
}
直连模式-直连(Simple)
@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Queue.DIRECT_MODE_QUEUE_DEMO)
public class DirectSimpleModeDemoConsumer {
@RabbitHandler
public void handle(MqMsgStruct msg, Message message, Channel channel) {
log.info("DirectSimpleModeDemoConsumer handle start --> queue:{}, msg:{}", message.getMessageProperties().getConsumerQueue(), msg);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel);
}
}
广播模式(Fanout)
@Slf4j
@Component
public class FanoutDemoConsumer {
@RabbitListener(queues = {RabbitConstants.Queue.FANOUT_MODE_QUEUE_DEMO_1})
public void handleQueue1(MqMsgStruct msg, Message message, Channel channel) {
log.info("FanoutDemoConsumer handleQueue1 handle consumerQueue:{}, receivedRoutingKey:{}, receivedExchange:{}, msg:{}", message.getMessageProperties().getConsumerQueue(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getReceivedExchange(), msg);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel);
}
@RabbitListener(queues = {RabbitConstants.Queue.FANOUT_MODE_QUEUE_DEMO_2})
public void handleQueue2(MqMsgStruct msg, Message message, Channel channel) {
log.info("FanoutDemoConsumer handleQueue2 handle consumerQueue:{}, receivedRoutingKey:{}, receivedExchange:{}, msg:{}", message.getMessageProperties().getConsumerQueue(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getReceivedExchange(), msg);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel);
}
}
主题模式(Topic)
@Slf4j
@Component
public class TopicDemoConsumer {
@RabbitListener(queues = RabbitConstants.Queue.TOPIC_MODE_QUEUE_DEMO_1)
public void handle1(MqMsgStruct msg, Message message, Channel channel) {
log.info("TopicDemoConsumer handle1 consumerQueue:{}, receivedRoutingKey:{}, receivedExchange:{}, msg:{}", message.getMessageProperties().getConsumerQueue(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getReceivedExchange(), msg);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel);
}
@RabbitListener(queues = RabbitConstants.Queue.TOPIC_MODE_QUEUE_DEMO_2)
public void handle2(MqMsgStruct msg, Message message, Channel channel) {
log.info("TopicDemoConsumer handle2 consumerQueue:{}, receivedRoutingKey:{}, receivedExchange:{}, msg:{}", message.getMessageProperties().getConsumerQueue(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getReceivedExchange(), msg);
ChannelHandlerUtils.basicAckAndRecover(msg, message, channel);
}
}