延迟消息实现

发布时间:2024年01月09日
package com.java1234.producer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {


    /**
     * direct交换机名称
     */
    public static final String DIRECT_EXCHANGE="directExchange";


    /**
     * delayedDirect交换机名称
     */
    public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";


    /**
     * TTL_direct交换机名称
     */
    public static final String TTL_DIRECT_EXCHANGE="ttldirectExchange";

    /**
     * dlx_direct交换机名称
     */
    public static final String DLX_DIRECT_EXCHANGE="dlxldirectExchange";

    /**
     * direct交换机名称1
     */
    public static final String DIRECT_EXCHANGE1="directExchange1";


    /**
     * fanout交换机名称
     */
    public static final String FANOUT_EXCHANGE="fanoutExchange";

    /**
     * direct队列名称
     */
    public static final String DIRECT_QUEUE="directQueue";

    /**
     * delayed direct队列名称
     */
    public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";

    /**
     * ttl_direct队列名称
     */
    public static final String TTL_DIRECT_QUEUE="ttldirectQueue";

    /**
     * dlx_direct队列名称
     */
    public static final String DLX_DIRECT_QUEUE="dlxdirectQueue";

    /**
     * direct1队列名称
     */
    public static final String DIRECT_QUEUE1="directQueue1";

    /**
     * direct2队列名称
     */
    public static final String DIRECT_QUEUE2="directQueue2";

    /**
     * 订阅队列1名称
     */
    public static final String SUB_QUEUE1="subQueue1";
    /**
     * 订阅队列2名称
     */
    public static final String SUB_QUEUE2="subQueue2";

    /**
     * direct路由Key
     */
    public static final String DIRECT_ROUTINGKEY="directRoutingKey";

    /**
     * ttl_direct路由Key
     */
    public static final String TTL_DIRECT_ROUTINGKEY="ttl_directRoutingKey";

    /**
     * dlx_direct路由Key
     */
    public static final String DLX_DIRECT_ROUTINGKEY="dlx_directRoutingKey";

    /**
     * delayed_direct路由Key
     */
    public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";

    /**
     * topic队列名称1
     *
     */
    public static final String TOPIC_QUEUE1="topicQueue1";

    /**
     * topic队列名称2
     *
     */
    public static final String TOPIC_QUEUE2="topicQueue2";


    /**
     * direct交换机名称
     */
    public static final String TOPIC_EXCHANGE="topicExchange";

    /**
     * 定义一个direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }


    /**
     * 定义一个TTL direct交换机
     * @return
     */
    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange(TTL_DIRECT_EXCHANGE);
    }


    /**
     * 定义一个delayed direct交换机
     * @return
     */
    @Bean
    public CustomExchange delayedDirectExchange(){
        Map<String, Object> args = new HashMap<>();

        args.put("x-delayed-type", "direct");

        return new CustomExchange(DELAYED_DIRECT_EXCHANGE,"x-delayed-message", true, false, args);
    }


    /**
     * 定义一个DLX direct交换机
     * @return
     */
    @Bean
    public DirectExchange dlxDirectExchange(){
        return new DirectExchange(DLX_DIRECT_EXCHANGE);
    }


    /**
     * 定义一个direct交换机1
     * @return
     */
    @Bean
    public DirectExchange directExchange1(){
        return new DirectExchange(DIRECT_EXCHANGE1);
    }


    /**
     * 定义一个fanout交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    /**
     * 定义一个TTL direct队列
     * @return
     */
    @Bean
    public Queue ttlDirectQueue(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-message-ttl",1000000);
        map.put("x-dead-letter-exchange",DLX_DIRECT_EXCHANGE);
        map.put("x-dead-letter-routing-key",DLX_DIRECT_ROUTINGKEY);
        map.put("x-max-length",10);
        return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
    }

    /**
     * 定义一个DELAYED direct队列
     * @return
     */
    @Bean
    public Queue delayedDirectQueue(){
        return new Queue(DELAYED_DIRECT_QUEUE);
    }

    /**
     * 定义一个DLX direct队列
     * @return
     */
    @Bean
    public Queue dlxDirectQueue(){
        return new Queue(DLX_DIRECT_QUEUE);
    }

    /**
     * 定义一个direct队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue(DIRECT_QUEUE);
    }




    /**
     * 定义一个direct1队列
     * @return
     */
    @Bean
    public Queue directQueue1(){
        return new Queue(DIRECT_QUEUE1);
    }


    /**
     * 定义一个direct2队列
     * @return
     */
    @Bean
    public Queue directQueue2(){
        return new Queue(DIRECT_QUEUE2);
    }

    /**
     * 定义一个订阅队列1
     * @return
     */
    @Bean
    public Queue subQueue1(){
        return new Queue(SUB_QUEUE1);
    }

    /**
     * 定义一个订阅队列2
     * @return
     */
    @Bean
    public Queue subQueue2(){
        return new Queue(SUB_QUEUE2);
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
     public Binding directBinding(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
     }



    /**
     * TTL定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding ttlDirectBinding(){
        return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(TTL_DIRECT_ROUTINGKEY);
    }



    /**
     * dlx定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding dlxDirectBinding(){
        return BindingBuilder.bind(dlxDirectQueue()).to(dlxDirectExchange()).with(DLX_DIRECT_ROUTINGKEY);
    }


    /**
     * dlx定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding delayedDirectBinding(){
        return BindingBuilder.bind(delayedDirectQueue()).to(delayedDirectExchange()).with(DLX_DIRECT_ROUTINGKEY).noargs();
    }



    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding1(){
        return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");
    }

    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding3(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");
    }
    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding directBinding4(){
        return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");
    }

    /**
     * 定义一个topic队列1
     */
    @Bean
    public Queue topicQueue1(){
        return new Queue(TOPIC_QUEUE1);
    }

    /**
     * 定义一个topic队列2
     */
    @Bean
    public Queue topicQueue2(){
        return new Queue(TOPIC_QUEUE2);
    }


    /**
     * 定义一个direct交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }


    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");
    }



    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
    }


    /**
     * 定义一个队列和交换机的绑定
     * @return
     */
    @Bean
    public Binding topicBinding3(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");
    }
}

package com.java1234.consumer.service.impl;

import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;

import org.springframework.context.annotation.Bean;

@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    CachingConnectionFactory cachingConnectionFactory;

    @Bean(name="limitContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setPrefetchCount(3);
        return factory;
    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.TTL_DIRECT_QUEUE})
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try{
            System.out.println(System.currentTimeMillis()+"接受到的mq消息:"+message);
            //处理业务
            System.out.println("处理业务"+1/0);
            System.out.println("deliveryTag="+deliveryTag);
            channel.basicAck(deliveryTag,true);
//            if(deliveryTag==5){
//                channel.basicAck(deliveryTag,true);
//            }
        }catch (Exception e){
            e.printStackTrace();
            try {
                channel.basicNack(deliveryTag,false,false);
//                channel.basicReject(deliveryTag,true);
//                Thread.sleep(1000);
            }catch (Exception e1){
                e1.printStackTrace();
            }

        }

    }


//    @Override
//    @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE},containerFactory = "limitContainerFactory")
//    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
//        try{
//            System.out.println("接受到的mq消息:"+message);
//            //处理业务
//            System.out.println("处理业务");
//            System.out.println("deliveryTag="+deliveryTag);
//            if(deliveryTag==5){
//                channel.basicAck(deliveryTag,true);
//            }
//        }catch (Exception e){
//            e.printStackTrace();
//            try {
//                channel.basicNack(deliveryTag,false,true);
                channel.basicReject(deliveryTag,true);
                Thread.sleep(1000);
//            }catch (Exception e1){
//                e1.printStackTrace();
//            }
//
//        }
//
//    }


    @Override
    @RabbitListener(queues = {RabbitMQConfig.DELAYED_DIRECT_QUEUE})
    public void receiveMessage2(String message) {
//        System.out.println("消费者1:接收到的mq消息:"+message);
                System.out.println("队列1接收日志消息:"+message+":"+new Date().toString());
    }


    @Override
    @RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})
    public void receiveMessage3(String message) {
//        System.out.println("消费者2:接收到的mq消息:"+message);
                System.out.println("队列2接收日志消息:"+message);

    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
    public void receiveSubMessage1(String message) {
        System.out.println("订阅者1:接收到的mq消息:"+message);
    }

    @Override
    @RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
    public void receiveSubMessage2(String message) {
        System.out.println("订阅者2:接收到的mq消息:"+message);
    }

}

package com.java1234.consumer.service;


import com.rabbitmq.client.Channel;

public interface RabbitMqService {

    /**
     * 接受消息
     */
    public void receiveMessage(String message, Channel channel, long deliveryTag);

    /**
     * 接受消息
     */
    public void receiveMessage2(String message);

    /**
     * 接受消息
     */
    public void receiveMessage3(String message);


    /**
     * 接受订阅消息1
     */
    public void receiveSubMessage1(String message);
    /**
     * 接受订阅消息2
     */
    public void receiveSubMessage2(String message);
}

package com.java1234.consumer;

import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
       ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
//        RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
//        rabbitMqService.receiveMessage();
    }
}

server:
  port: 81
spring:
  rabbitmq:
    host: 192.168.30.113
    port: 5672
    username: pzy
    password: 123456
    virtual-host: /


package com.java1234.producer.service.impl;

import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * String exchange 交换机名称
     * String routingKey 路由Key
     * Object object 具体发送的消息
     * @param message
     */
    @Override
    public void sendMessage(String message) {
//        amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);
        CorrelationData correlationData=new CorrelationData("3453");

//        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);

    }

    @Override
    public void sendTTLMessage(String message) {
//        MessageProperties messageProperties=new MessageProperties();
//        messageProperties.setExpiration("20000");//设置过期时间 20秒
//        Message msg=new Message(message.getBytes(),messageProperties);
        rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_DIRECT_EXCHANGE,RabbitMQConfig.TTL_DIRECT_ROUTINGKEY,message);
    }

    @Override
    public void sendFanoutMessage(String message) {
        amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);
    }

    @Override
    public void sendRoutingMessage() {
        amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning2","发送warning2级别的消息");
    }

    @Override
    public void sendTopicMessage() {
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.rabbit","飞快的橘色兔子");
//                amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.orange.elephant","慢腾腾的橘色大象");
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.fox","quick.orange.fox");
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.brown.fox","lazy.brown.fox");
        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.brown.fox","quick.brown.fox");
    }

    @Override
    public void sendDelayedMessage(String message, Integer delayTime) {
        amqpTemplate.convertAndSend(RabbitMQConfig.DELAYED_DIRECT_EXCHANGE,RabbitMQConfig.DELAYED_DIRECT_ROUTINGKEY,message,a->{
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }

    /**
     *
     * @param correlationData 消息唯一标识
     * @param ack 交换机是否成功收到消息 true成功 false失败
     * @param cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        System.out.println("confirm方法被执行了..."+correlationData);
        if(ack){
            System.out.println("交换机,消息接收成功"+cause);
        }else{
            System.out.println("交换机,消息接收失败"+cause);
            //我们这里要做一些消息补发的措施
            System.out.println("id="+correlationData.getId());
        }
    }

    /**
     *
     * @param message 消息主体
     * @param replyCode 返回code
     * @param replyText 返回信息
     * @param exchange 交换机
     * @param routingKey 路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return方法被执行...");
        System.out.println("消息主体:"+new String(message.getBody()));
        System.out.println("replyCode:"+replyCode);
        System.out.println("replyText:"+replyText);
        System.out.println("exchange:"+exchange);
        System.out.println("routingKey:"+routingKey);
    }
}

package com.java1234.producer.service;

public interface RabbitMqService {

    /**
     * 发送消息
     * @param message
     */
    public void sendMessage(String message);


    /**
     * 发送TTL消息
     * @param message
     */
    public void sendTTLMessage(String message);

    /**
     * 发送消息
     * @param message
     */
    public void sendFanoutMessage(String message);


    /**
     * 发送路由模式消息
     */
    public void sendRoutingMessage();

    /**
     * 发送Topic模式消息
     */
    public void sendTopicMessage();

    /**
     * 发送延迟消息
     * @param message
     * @param delayTime
     */
    public void sendDelayedMessage(String message,Integer delayTime);
}

package com.java1234.producer;

import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
        RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
//        rabbitMqService.sendTTLMessage("测试消息TTL");
        rabbitMqService.sendDelayedMessage("测试延迟消息10秒",10000);
        rabbitMqService.sendDelayedMessage("测试延迟消息20秒",20000);
//        rabbitMqService.sendRoutingMessage();
//        rabbitMqService.sendTopicMessage();
//        rabbitMqService.sendMessage("confirm确认测试消息");
//                for(int i=0;i < 20;i++){
                     rabbitMqService.sendMessage("测试消息"+i);
//                    rabbitMqService.sendTTLMessage("测试消息TTL"+i);
//            rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
             rabbitMqService.sendFanoutMessage(i+"用户欠费了");
//        }

    }
}

server:
  port: 80


spring:
  rabbitmq:
    host: 192.168.30.113
    port: 5672
    username: pzy
    password: 123456
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true

文章来源:https://blog.csdn.net/m0_68935893/article/details/135476900
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。