理论上来说,订单到期关闭的实现方式可以有:
使用RabbitMQ延迟队列实现的过程是:
@Configuration
public class RabbitConfig {
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlExchange() {
return (DirectExchange) ExchangeBuilder
.directExchange("order.exchange.ttl")
.durable(true)
.build();
}
/**
* 订单延迟队列
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable("order.queue.ttl")
.build();
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with("order.key.ttl");
}
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderExchange() {
return (DirectExchange) ExchangeBuilder
.directExchange("order.exchange.cancel")
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "order.exchange.ttl");
args.put("x-dead-letter-routing-key", "order.key.ttl");
return QueueBuilder.durable("order.cancel").withArguments(args).build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) {
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with("order.key.ancel");
}
}
@Component
@Slf4j
public class CancelOrderProducter {
@Resource
private RabbitTemplate rabbitTemplate;
@Retryable(value= {Exception.class},maxAttempts = 3)
public void sendMessage(String orderNumber, final long delayTimes) {
log.info("发送mq,订单号:{}",orderNumber);
// 给延迟队列发送消息
rabbitTemplate.convertAndSend("order.exchange.cancel",
"order.key.cancel", orderNumber, message-> {
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
log.info("send orderNumber:{}", orderNumber);
}
}
/**
* 取消订单消息的处理者
*/
@Slf4j
@Component
@RabbitListener(queues = "order.cancel.ttl")
public class CancelOrderConsumer {
@Autowired
private OrderService orderService;
@RabbitHandler
public void handle(String orderNumber, Channel channel, Message message) {
try {
log.info("订单超时取消,订单号:{}",orderNumber);
//业务处理
orderService.cancelOrder(orderNumber);
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("超时订单消息接收成功,订单号:{}",orderNumber);
}catch (Exception e){
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
log.info("超时订单消息接收失败,订单号:{},重新入队",orderNumber);
} catch (IOException ioException) {
log.info("异常信息为:{}", e.getMessage());
}
}
}
}
/**
* 自动取消未支付订单任务
*/
@Component
public class AutoCancelUnPayOrderTask implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(AutoCancelUnPayOrderTask.class);
@Resource
private NormalOrderService normalOrderService;
private final RBlockingDeque<String> blockingDeque;
private final RDelayedQueue<String> delayedQueue;
public AutoCancelUnPayOrderTask(RedissonClient redissonClient) {
blockingDeque = redissonClient.getBlockingDeque(CacheKeyConstant.UN_PAY_ORDER_QUEUE_KEY);
delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
}
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(() -> {
while (true) {
String orderSn;
try {
orderSn = blockingDeque.take();
} catch (InterruptedException exception) {
logger.info("AutoCancelUnPayOrderTask:取未支付订单打断:" + exception, exception);
Thread.currentThread().interrupt();
continue;
} catch (Exception exception) {
logger.error("AutoCancelUnPayOrderTask:取未支付订单Redis队列数据异常:" + exception, exception);
continue;
}
logger.info("到达订单【{}】超时时间", orderSn);
try {
consume(orderSn);
} catch (Exception exception) {
logger.error("AutoCancelUnPayOrderTask:系统取消订单{}异常:" + exception, orderSn, exception);
}
}
}).start();
}
private void consume(String orderSn) {
normalOrderService.systemCancel(orderSn);
}
public void supply(String orderSn, long ttl, TimeUnit timeUnit) {
delayedQueue.offer(orderSn, ttl, timeUnit);
}
public void discard(String orderSn) {
boolean removed = delayedQueue.remove(orderSn);
if (!removed) {
logger.info("移除自动取消未支付订单【{}】任务失败", orderSn);
}
}
}