目录
TTL
(Time To Live
),也就是过期时间,RabbitMQ中可以对消息和队列设置TTL(消息的过期时间),消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。
当队列中的消息存留时间超过了配置的生存时间(TTL),则称该消息已死亡。注意,同一个消息被路由到不同的队列将拥有不同的过期时间,又或者永远不会过期。这取决于消息所存在的队列。一个队列中的死亡消息不会影响到其他队列中与之相同消息的生命周期。
通过队列属性设置:队列中所有消息都有相同的过期时间
对消息进行单独设置:每条消息TTL可以不同
注意:如同时使用2种方式,过期时间以最小的数值为准。
设置队列的过期时间,则消息到过期时间后会从队列删除
设置消息的过期时间,会在消息投递给消费者的时候判断,是否过期,过期则删除
设置的过期时间值都只能是非负整数(n >=0),并且其时间单位为毫秒(ms)。?
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); // 设置过期时间,单位:毫秒
Message message = new Message(json.getBytes(), messageProperties);
//发送消息
amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, message);
System.out.println("发送完毕:" + new Date());
单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;?
@Bean
public Queue directQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
return new Queue(DIRECT_QUEUE, true, false, false, arguments);
}
队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;?
将消息的TTL值设置为0,意味着消息到达队列后将会立即过期不会被队列保存,除非消息能够被立即传递给消费者。所以这给RabbitMQ不支持immediate的推送标记提供了另一种选择性。不像其他标记,当没有使用basic.return方法时,如果队列设置了死信交换机(dead letter exchange),则过期消息将会被路由到死信队列(与死信交换机绑定的队列称为死信队列)。
使用policy设置消息的TTL,需要执行“message-ttl”的值:
rabbitmqctl set_policy TTL ".*" '{"message-ttl:60000"}' --apply-to queues
通过以上命令,为所有的队列都设置了一个60s的消息有效期。
Policy 是一种特殊的运行时参数的用法。Policy 是 vhost 级别的。一个 Policy 可以匹配一个或多个交换器或队列,这样便于批量管理。
Policy 支持动态地修改一些属性参数,这就解决了 RabbitMQ 客户端创建的交换器和队列不能修改的问题,也大大提高了应用的灵活性。
TTL也可以给队列本设置,不仅仅是队列内容。队列将在不被使用(比如,没有消费者)后的一段时间内过期。这个特性可以和队列的自动删除属性(auto-delete queue property)一块使用。
队列的TTL可以在队列声明时指定x-expires字段值进行设置,或者通过设置policy的expires参数值进行设置。该值决定了队列不再被使用后直到被自动删除的时长。不再使用的意思是队列没有订阅的消费者,队列最近没有被重新声明,并且在过期时间basic.get方法没有被调用。这个特性是十分有用的,比如,在通过RabbitMQ实现RPC调用时,会生成大量的回复队列。
服务器保证如果队列在最近的过期时间内没有被使用,那么该队列将会被删除。但是不保证在过期后能够以多快的速度删除。当服务器重启时,队列的租期重新开始计算。
参数x-expires的值或者policy的expires参数的值表示了过期时间的毫秒时长。它必须是一个正整数(不像消息的TTL可以设置为0)。值为1000表示消息不再被使用后1s将会被删除。
设置队列不再使用后40分钟过期:
rabbitmqctl set_policy expiry ".*" '{"expires" : 2400000}' --apply-to queues
Map<String, Object> args = new HashMap<>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
?