学习一样新技术、新框架,最重要的是学习其思想、原理。即原理性思维。
如果是因为工作原因,需要快速上手RabbitMQ,本篇或许适合你。
192.168.204.179 rabbit
docker pull rabbitmq:3.8.2-management
docker run -d --restart=always \
--hostname rabbit \
--name=rabbitmq \
-p 5671:5617 -p 5672:5672 -p4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 \
rabbitmq:3.8.2-management
5671:开启管理插件时,管理界面接口
5671、5672:AMQP
4369:守护进程
25672:
访问管理界面:192.168.204.179:15672
默认账号密码:guest
角色列表:
每个虚拟主机就相当于一个独立的MQ服务器,虚拟主机之间相互隔离。
<!--amqp协议的起步依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbit测试依赖坐标-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--SpringBoot测试依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
# RabbitMQ 服务host地址
spring.rabbitmq.host=rabbit
# 端口
spring.rabbitmq.port=5672
# 虚拟主机地址
spring.rabbitmq.virtual-host=/mytest
# rabbit服务的用户名
spring.rabbitmq.username=test
# rabbit服务的密码
spring.rabbitmq.password=123456
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
rabbitTemplate.convertAndSend("/myqueue", "hello 小兔子!");
}
}
@Component
@RabbitListener(queues = "/myqueue")
public class SimpleListener {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====接收消息====>"+msg);
}
}
相比于简单模式,这个模式下会存在多个消费者。
@Component
@RabbitListener(queues = "/myqueue")
public class SimpleListener2 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====222接收消息====>"+msg);
}
}
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
for (int i = 0; i < 10000; i++) {
rabbitTemplate.convertAndSend("/myqueue", "hello 小兔子!" + i);
}
}
}
消息将发给交换机,交换机的类型决定了它会怎么处理这个消息:
符号 “#” 匹配一个或多个词,符号""匹配不多不少一个词。因此“audit.#” 能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到 “audit.irs”。
总结就是交换机负责消息转发,不进行数据存储,如果没有找到绑定的队列或匹配的队列,消息将会丢失。
将同一个消息广播到订阅的多个消费者手中。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class MQSPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
for (int i = 0; i < 10000; i++) {
rabbitTemplate.convertAndSend("/fanout_exchange", null, "hello 小兔子!" + i);
}
}
}
@Component
@RabbitListener(queues = "/fanout_queue1")
public class SimpleListener_Fanout1 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====222接收消息====>"+msg);
}
}
@Component
@RabbitListener(queues = "/fanout_queue2")
public class SimpleListener_Fanout2 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====3333接收消息====>"+msg);
}
}
在将队列绑定到交换机的时候,需要指定路由key;发送消息的时候也要指明路由key。
/**
* 路由
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class MQRoutingTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
for (int i = 0; i < 10000; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("/routing_exchange", "info", "hello 小兔子!" + i);
} else {
rabbitTemplate.convertAndSend("/routing_exchange", "err", "hello 小黑子!" + i);
}
}
}
}
@Component
@RabbitListener(queues = "/routing_err1")
public class Routing_Err1 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====routing_err1接收消息====>"+msg);
}
}
@Component
@RabbitListener(queues = "/routing_err2")
public class Routing_Err2 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====routing_err2接收消息====>"+msg);
}
}
@Component
@RabbitListener(queues = "/routing_info1")
public class Routing_info1 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====routing_info1接收消息====>"+msg);
}
}
#:匹配零个或多个。
*:匹配一个。
/**
* topc
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class MQTopicTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
/**
* 参数1:消息队列名称
* 参数2:消息内容
*/
for (int i = 0; i < 10000; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("/topic_exchange", "test.info", "hello 小兔子!" + i);
} else {
rabbitTemplate.convertAndSend("/topic_exchange", "test.err", "hello 小黑子!" + i);
}
rabbitTemplate.convertAndSend("/topic_exchange", "my.test.warn", "hello 小白子!" + i);
}
}
}
@Component
@RabbitListener(queues = "/topic_info")
public class Topic1 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====topic_info接收消息====>"+msg);
}
}
@Component
@RabbitListener(queues = "/topic_err")
public class Topic2 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====topic_err接收消息====>"+msg);
}
}
@Component
@RabbitListener(queues = "/topic_warn")
public class Topic3 {
@RabbitHandler
public void simpleHandler(String msg){
System.out.println("=====topic_warn接收消息====>"+msg);
}
}
rabbitmq提供了两种方式来保证投递的可靠性:
spring:
rabbitmq:
password: 123456
username: test
virtualHost: /mytest
port: 5672
host: rabbit
publisherReturns: true
publisherConfirmType: SIMPLE
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到exchange成功");
} else {
log.info("消息发送到exchange失败");
}
}
}
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息发送失败:{}", returnedMessage.getMessage());
}
}
@Configuration
public class RabbitCallBackConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
rabbitTemplate.setReturnsCallback(new RabbitReturnCallback());
}
}
@RestController
public class TestController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("confirmCallBack")
public String confirmCallBack() {
for (int i = 0; i < 10000; i++) {
rabbitTemplate.convertAndSend("/routing_exchange", "err", "hello 小黑子!" + i);
}
return "ok";
}
@GetMapping("returnCallBack")
public String returnCallBack() {
for (int i = 0; i < 10000; i++) {
// 不存在的routingkey
rabbitTemplate.convertAndSend("/routing_exchange", "err2", "hello 小黑子!" + i);
}
return "ok";
}
}
消费者消息确认有三种类型:
为什么手动确认更安全?
消费者处理消息失败时,可以重新处理消息。其它优势:1.消费者可以根据处理能力控制消费速率;2.批量确认多个信息。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义监听器,监听到消息之后,立即执行onMessage方法
*/
@Component
public class CustomAckConsumerListener implements ChannelAwareMessageListener {
private static final AtomicInteger errCount = new AtomicInteger();
/**
* 监听到消息之后执行的方法
*
* @param message 消息内容
* @param channel 消息所在频道
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "UTF-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:" + msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
// 模拟业务错误
if(errCount.getAndIncrement() % 2 == 0) {
System.out.println("业务报错,重回队列");
channel.basicNack(deliveryTag, false, true);
return;
}
// 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
// 参数1:消息投递标签
// 数2:是否批量签收:true一次性签收所有,false,只签收当前消息
channel.basicAck(deliveryTag, false);
System.out.println("手动签收完成:{}");
}
}
import cn.lsj.consumer.listener.CustomAckConsumerListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ListenerConfiguration {
/**
* 注入消息监听器适配器
*
* @param customAckConsumerListener 自定义监听器对象
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(CustomAckConsumerListener customAckConsumerListener) {
//创建自定义监听器适配器对象
return new MessageListenerAdapter(customAckConsumerListener);
}
/**
* 注入消息监听器容器
*
* @param connectionFactory 连接工厂
* @param messageListenerAdapter 自定义的消息监听器适配器
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
//简单的消息监听器容器对象
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
//绑定消息队列
container.setQueueNames("/routing_err3");
//设置连接工厂对象
container.setConnectionFactory(connectionFactory);
//设置消息监听器适配器
container.setMessageListener(messageListenerAdapter);
//设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自 动确认消息)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}
应用场景:
要求:必须为手动确认消息。
package cn.lsj.consumer.config;
import cn.lsj.consumer.listener.CustomAckConsumerListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ListenerConfiguration {
/**
* 注入消息监听器适配器
*
* @param customAckConsumerListener 自定义监听器对象
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(CustomAckConsumerListener customAckConsumerListener) {
//创建自定义监听器适配器对象
return new MessageListenerAdapter(customAckConsumerListener);
}
/**
* 注入消息监听器容器
*
* @param connectionFactory 连接工厂
* @param messageListenerAdapter 自定义的消息监听器适配器
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
//简单的消息监听器容器对象
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
//绑定消息队列
container.setQueueNames("/routing_err3");
//设置连接工厂对象
container.setConnectionFactory(connectionFactory);
//设置消息监听器适配器
container.setMessageListener(messageListenerAdapter);
//设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自 动确认消息)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消费端限流,每次拉取消息多少条,默认是250条
container.setPrefetchCount(1);
return container;
}
}
消息过期时间到未被消费则被自动清楚。
可以针对消息设置,也可以针对队列设置。
rabbitTemplate.convertAndSend("/routing_exchange", "err", "hello 小黑子!" + count.getAndIncrement(), m -> {
// 10秒
m.getMessageProperties().setExpiration(String.valueOf(10000L));
return m;
});
参数message-TTL
,单位毫秒,在创建队列的时候添加。
当消息过期未消费;当消费者拒接消息且不放回源队列;当队列队列达到最大限度时。
以上三个场景,导致了Dead message(死消息)的产生。
消息到达队列之后,不会马上被消费,而是等待一段时间之后才会被消费。
死信队列配合过期队列实现延迟队列。
如下图,我们不设置过期队列的消费者,让消息过期之后进入死信队列,达到延迟效果。