目录
producer->exchange:确保消息发送到RabbitMQ服务器的交换机上
exchange ->?queue:确保消息从交换机发到队列?
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。
rabbitmq 整个消息投递的路径为:
? ? producer—>rabbitmq broker—>exchange—>queue—>consumer
? ? 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
? ? 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
消息从 producer 到 exchange 则会返回一个 confirmCallback?
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
???????具体代码设置
@Component
public class MessageConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机收到消息后,会回调该方法
*
* @param correlationData 相关联的数据
* @param ack 有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机
* @param cause 消息没有正确地到达交换机的原因是什么
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData = " + correlationData);
System.out.println("ack = " + ack);
System.out.println("cause = " + cause);
if (ack) {
//正常
} else {
//不正常的,可能需要记日志或重新发送
}
}
}
发消息参考代码(需要对RabbitTemplate 进行初始化)
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MessageConfirmCallBack messageConfirmCallBack;
@PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用
public void init() {
rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
}
/**
* 发送消息
*/
public void sendMessage() {
//关联数据对象
CorrelationData correlationData = new CorrelationData();
correlationData.setId("O159899323"); //比如设置一个订单ID,到时候在confirm回调里面,你就可以知道是哪个订单没有发送到交换机上去
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE + 123, "info", "hello", correlationData);
System.out.println("消息发送完毕......");
}
}
? ? RabbitMQ支持事务(transaction),RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。
channel.txSelect(); // 将当前channel设置成事务模式
/**
* ConfirmConfig.exchangeName(交换机名称)
* ConfirmConfig.routingKey(路由键)
* message (消息内容)
*/
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message));
channel.txCommit(); // 提交事务
channel.txRollback(); // 回滚事务
注: 事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发。
事务机制的缺点 :
可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。使用return模式,可以实现消息无法路由的时候返回给生产者;当然在实际生产环境下,我们不会出现这种问题,我们都会进行严格测试才会上线(很少有这种问题);
消息从 exchange –> queue 投递失败则会返回一个 returnCallback?
开启确认模式;使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;
注意配置文件中,开启 退回模式;
spring.rabbitmq.publisher-returns: true |
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;
@Component
public class MessageReturnCallBack implements RabbitTemplate.ReturnsCallback {
/**
* 当消息从交换机 没有正确地 到达队列,则会触发该方法
* 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法
*
* @param returned
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息return模式:" + returned);
}
}
@Service
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MessageReturnCallBack messageReturnCallBack;
@PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用
public void init() {
rabbitTemplate.setReturnsCallback(messageReturnCallBack);
}
/**
* 发送消息
*/
public void sendMessage() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello");
System.out.println("消息发送完毕......");
}
}
使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上。
?备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。到达这个队列以后消费者可以进行处理,通知开发人员进行查看。
当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。?
设置参考代码
Map<String, Object> arguments = new HashMap<>();
//指定当前正常的交换机的备用交换机是谁
arguments.put("alternate-exchange", EXCHANGE_ALTERNATE);
//DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new DirectExchange(EXCHANGE, true, false, arguments);
//return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
QueueBuilder.durable(QUEUE).build();
ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
MessageProperties messageProperties = new MessageProperties();
//设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
?允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行 ?
如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
????????引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。?
?
采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);
#开启手动ack消息消费确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;
如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);