????????消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,本文详细介绍两个环节的消息可靠性传递方式:1)、消息传递到交换机的 confirm 模式;2)、消息传递到队列的 Return 模式。
消息从 producer 到 exchange 则会返回一个 confirmCallback;
?消息从 exchange?到 queue 投递失败则会返回一个 returnCallback;
????????消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障。
1)、配置文件application.yml 开启确认模式
spring:
rabbitmq:
host: 192.168.30.88
port: 5672
username: admin
password: admin
virtual-host: test
publisher-confirm-type: correlated # 开启数据关联确认机制
2)、组件绑定关系配置(采用直连交换机模式)
@SpringBootConfiguration
@ConfigurationProperties(prefix = "rabbit.confirm")
public class RabbitConfigCommit {
private String exchangeName;
private String queueName;
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(exchangeName).build();
}
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
public Binding bindingA(DirectExchange exchange, Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("info");
}
}
3)、写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;设置rabbitTemplate的确认回调方法rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
@Service
public class MessageServiceConfirm implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
public void sendMsg(){
Message message = MessageBuilder.withBody("hello long".getBytes()).build();
// 设置消息关联数据
CorrelationData correlationData = new CorrelationData();
correlationData.setId("123");
rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);
}
// 消息确认返回判断
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("消息发送到交换机成功!" + correlationData.getId());
return;
}
System.out.println("消息发送到交换机失败!" + correlationData.getId());
}
}
1)、配置文件application.yml 开启 retrun 模式
spring:
rabbitmq:
host: 192.168.30.88
port: 5672
username: admin
password: admin
virtual-host: test
publisher-confirm-type: correlated
publisher-returns: true # 开启Return模式
2)、组件配置(参考confirm模式)
3)、使用rabbitTemplate.setReturnCallback设置退回函数,当消息从?exchange?路由到 queue?失败后,则会将消息退回给producer,并执行回调函数 returnedMessage(采用匿名内部类/lambda表达式实现)
@Service
public class MessageServiceReturn {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// 不使用匿名内部类,就要实现接口 implements RabbitTemplate.ReturnsCallback
rabbitTemplate.setReturnsCallback(msg -> {
System.out.println("消息发送结果:" + msg.getReplyText());
});
// rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
// @Override
// public void returnedMessage(ReturnedMessage returnedMessage) {
// System.out.println("消息发送结果:" + returnedMessage.getReplyText());
// }
// });
}
public void sendMsg(){
Message message = MessageBuilder.withBody("hello long".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);
}
}
? ? ? ? 本文详细介绍两种模式的消息可靠性保证,关于Rabbitmq 消息持久化、集群配置等更高级的内容关注下面公众号查阅。
????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)