TTL 允许设置消息的生存时间,超过指定时间仍未被消费者处理的消息将被视为过期。过期消息可以进入死信队列,用于后续处理或分析。这一特性有助于系统资源的合理利用和消息的有效管理。
@Test
public void test2(){
//1.创建具有过期时间的消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message("单条消息过期".getBytes(),messageProperties);
//2.发送消息
rabbitTemplate.convertAndSend("ttl-exchange","ttl-routingKey",message);
}
**注意: **
RabbitMQ 支持为消息设置优先级,确保重要消息得到更快的处理。生产者可以通过设置消息的优先级,让代理在投递消息时考虑这一因素,提高关键消息的传递速度。
/**
* 队列优先级
*/
@Configuration
public class PriorityConfig {
@Bean("pri-exchange")
public Exchange ackExchange(){
return ExchangeBuilder.topicExchange("pri-exchange").durable(true).build();
}
@Bean("pri-queue")
public Queue ackQueue(){
return QueueBuilder
.durable("pri-queue")
//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
@Bean
public Binding binding(@Qualifier("pri-queue")Queue queue,
@Qualifier("pri-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("pri-routingKey").noargs();
}
}
消息事务是 RabbitMQ 提供的一种机制,用于确保消息的可靠传递。生产者可以开启事务,在事务中发布消息,只有在事务提交时,消息才会被真正发送到队列。这有助于确保消息的原子性传递。
RabbitMQ 支持构建集群,将多个 RabbitMQ 节点组合在一起,提高系统的可用性和容错能力。集群中的节点可以共享队列和交换机的信息,确保在某个节点故障时,其他节点能够继续提供服务。
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
spring:
rabbitmq:
host: 192.168.184.136
port: 5672
virtual-host: /cvdf
username: cvdf
password: cvdf
# 手动签收消息
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息
prefetch: 5
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
死信队列实现如下
/**
* 死信队列相关配置
*/
@Configuration
public class DLXConfig {
//1.死信交换机、队列、binding
@Bean("dead-exchange")
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange("dead-exchange").durable(true).build();
}
@Bean("dead-queue")
public Queue deadQueue(){
return QueueBuilder
.durable("dead-queue")
.build();
}
@Bean
public Binding deadBinding(@Qualifier("dead-queue")Queue queue,
@Qualifier("dead-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("dead-routingKey").noargs();
}
//2.正常交换机、队列、binding
@Bean("normal-exchange")
public Exchange ackExchange(){
return ExchangeBuilder.topicExchange("normal-exchange").durable(true).build();
}
@Bean("normal-queue")
public Queue ackQueue(){
return QueueBuilder
.durable("normal-queue")
.deadLetterExchange("dead-exchange")//绑定死信交换机
.deadLetterRoutingKey("dead-routingKey")//死信routingKey
//过期时间
.ttl(20000)
.maxLength(10) //队列最大长度
.build();
}
@Bean
public Binding binding(@Qualifier("normal-queue")Queue queue,
@Qualifier("normal-exchange")Exchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("normal-routingKey").noargs();
}
}
#生产者发送消息
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//演示消息超过队列长度成为死信
for (int i = 1; i <=20 ; i++) {
rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
}
}
@Test
public void test2(){
//演示消息过期成为死信
for (int i = 1; i <=2 ; i++) {
rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
}
}
}
/**
* 消费者拒收成为死信队列
*/
@Component
public class DLXListener {
@RabbitListener(queues = "normal-queue")
public void ack(Message message, Channel channel) throws IOException, InterruptedException {
//获得消息投递序号,消息每次投递该值都会+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//拒签消息:消息投递序号,是否一次拒签多条消息,拒签消息是否重回队列
channel.basicNack(deliveryTag,true,false);
}
}
死信队列实现延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。设置相应的过期时间,到期都进去死信队列,在死信队列中被消费,从而形成延时效果。
弊端:在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
解决:RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
【*】下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件。
RabbitMQ 可以广泛应用于各种场景,其中一些主要应用包括:
异步通信: 通过消息队列实现应用程序之间的解耦,提高系统的响应速度和并发处理能力。
任务调度: 将任务以消息的形式发送到队列,由消费者进行处理,实现任务的异步执行和调度。
日志处理: 将系统产生的日志通过消息队列发送到日志处理系统,实现日志的集中管理和分析。
微服务通信: 在微服务架构中,通过消息队列进行服务之间的通信,确保服务之间的松耦合和可扩展性。
事件驱动架构: 使用消息队列实现事件的发布和订阅,实现系统中不同组件之间的松耦合通信。
RabbitMQ 与 Spring 框架的集成相对简单,Spring 提供了 spring-amqp 模块用于支持 RabbitMQ。通过 Spring 提供的注解和配置,开发者可以方便地在 Spring 项目中使用 RabbitMQ。
RabbitMQ 提供了一系列的安全性特性,包括身份验证、访问控制列表(ACL)和加密通信等。通过合理配置这些安全性选项,可以确保 RabbitMQ 系统的数据安全和隐私保护。
为了保持 RabbitMQ 系统的高性能,开发者需要进行性能调优和监控。通过监控工具,如 Prometheus 和 Grafana,以及 RabbitMQ 提供的管理插件,可以实时监测系统性能、队列状态和节点健康情况,从而及时发现和解决潜在问题。
RabbitMQ 作为一款强大的消息代理系统,为构建分布式系统和微服务提供了可靠的消息传递解决方案。本博客从 RabbitMQ 的基础概念、核心特性、高级特性、应用场景、集成与安全性等方面进行了详尽的探讨。深入理解 RabbitMQ,将有助于开发者更好地应用它解决实际问题,并构建高性能、可扩展、可靠的分布式应用系统。希望读者通过本文能够对 RabbitMQ 有更深入的了解,从而更好地应用于实际开发中。
以上就是全部内容,如果你有任何问题、意见或建议,都欢迎在评论中分享。让我们继续分享知识,共同成长,一起走向更加美好的未来。感谢你们的阅读,祝愿你们在未来的道路上一帆风顺!