RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
具体的模型。概念都不说了。直接看代码
package com.example.demoamqp.controller;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.example.demoamqp.entity.Order;
import com.example.demoamqp.send.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* @version 1.0.0
* @className: TestSendMqController
* @description: 消息发送者
* @author: zhangjunfa
* @date: 2023/6/16 11:05
*/
@Slf4j
@RequestMapping
@RestController
public class TestSendMqController {
private Sender sender;
private FanoutSender fanoutSender;
private TopicSender topicSender;
private DeadSender deadSender;
private DelayQueueSender delayQueueSender;
private Delay2Sender delay2Sender;
public TestSendMqController(Sender sender, FanoutSender fanoutSender, TopicSender topicSender, DeadSender deadSender, DelayQueueSender delayQueueSender, Delay2Sender delay2Sender) {
this.sender = sender;
this.fanoutSender = fanoutSender;
this.topicSender = topicSender;
this.deadSender = deadSender;
this.delayQueueSender = delayQueueSender;
this.delay2Sender = delay2Sender;
}
@PostMapping("/send")
public Object send(@RequestParam(name = "param") String param) throws InterruptedException {
Thread.sleep(3000);
sender.send(param);
return "success";
}
@PostMapping("/sendSimple")
public Object sendSimple(@RequestParam(name = "orderName") String orderName) throws InterruptedException {
Order order = new Order();
order.setId(IdUtil.getSnowflakeNextId());
order.setOrderName(orderName);
order.setOrderNo(IdUtil.nanoId());
order.setCreatedTime(DateUtil.date());
sender.sendSimple(order);
return "success";
}
@PostMapping("/sendCode")
public Object sendCode() throws InterruptedException {
int randomInt = RandomUtil.randomInt(100000, 999999);
log.info("生产者生成了一个验证码:{}", randomInt);
this.fanoutSender.sendCode(String.valueOf(randomInt));
return "success";
}
@PostMapping("/sendTopic")
public Object sendTopic(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws InterruptedException {
this.topicSender.sendMsg(msg, routingKey);
return "success";
}
/**
* 延迟队列(死信)设计
*
* @param msg
* @return
* @throws InterruptedException
*/
@PostMapping("/sendDead")
public Object sendDead(@RequestParam(name = "msg") String msg) throws InterruptedException {
this.deadSender.sendDelay(msg, 2000);
return "success 我是死信队列";
}
/**
* 延迟队列设计
*
* @param msg
* @return
* @throws InterruptedException
*/
@PostMapping("/sendDelay")
public Object sendDelay(@RequestParam(name = "msg") String msg) throws InterruptedException {
this.delayQueueSender.sendMsg(msg);
return "success 我是延迟队列";
}
/**
* 延迟队列设计
*
* @param msg
* @return
* @throws InterruptedException
*/
@PostMapping("/sendDelay2")
public Object sendDelay2(@RequestParam(name = "msg") String msg,@RequestParam(name = "delayTime") Integer delayTime) throws InterruptedException {
this.delay2Sender.sendDelay2(msg,delayTime);
return "success 我是延迟队列";
}
}
package com.example.demoamqp.config;
import com.example.demoamqp.conatants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 创建消息队列并注入容器中
*
* @author ross
*/
@Configuration
public class QueueConfig {
/**
* 创建队列
*
* @return
*/
@Bean
public Queue createQueue() {
return new Queue("ross_amq");
}
/****************************** 发布、订阅者模式 *********************************/
@Bean // 邮箱的队列
public Queue mailQueue(){
return new Queue(Constants.MQ_MAIL_QUEUE,
Constants.durable,
Constants.exclusive,
Constants.autoDelete);
}
@Bean // 电话的队列
public Queue phoneQueue(){
return new Queue(Constants.MQ_PHONE_QUEUE,
Constants.durable,
Constants.exclusive,
Constants.autoDelete);
}
@Bean // 交换机
public FanoutExchange fanoutExchange(){
return new FanoutExchange(Constants.MQ_FANOUT_EXCHANGE,
Constants.durable,
Constants.autoDelete);
}
/**
* 邮箱绑定交换机
* @return
*/
@Bean
public Binding mailBinding(){
return BindingBuilder.bind(mailQueue())
.to(fanoutExchange());
}
/**
* 电话绑定交换机
* @return
*/
@Bean
public Binding phoneBinding(){
return BindingBuilder.bind(phoneQueue())
.to(fanoutExchange());
}
/*----------------------------------------------------*/
@Bean // A队列
public Queue topicAQueue(){
return new Queue(Constants.MQ_TOPIC_QUEUE_A,
Constants.durable,
Constants.exclusive,
Constants.autoDelete);
}
/**
* topic模式相关配置
*/
@Bean // B队列
public Queue topicBQueue(){
return new Queue(Constants.MQ_TOPIC_QUEUE_B,
Constants.durable,
Constants.exclusive,
Constants.autoDelete);
}
@Bean // topic的交换机
public TopicExchange topicMyExchange(){
return new TopicExchange(Constants.MQ_TOPIC_EXCHANGE,
Constants.durable,
Constants.autoDelete);
}
@Bean
public Binding topicAQueueBinding(){
return BindingBuilder
.bind(topicAQueue())
.to(topicMyExchange())
.with("topic.xxx"); // 规则 topic.xxx
}
@Bean
public Binding topicBQueueBinding(){
return BindingBuilder
.bind(topicBQueue())
.to(topicMyExchange())
.with("topic.*"); // 规则 topic.xxx
}
}
package com.example.demoamqp.receiver;
import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 死信队列消费者
* @className: MqDelayReceiver
* @projectName: demo-one
* @auth: rosszhang
* @date: 2023/12/28 16:53
*/
@Slf4j
@Component
public class MqDelayReceiver {
@RabbitListener(queues = Constants.MQ_DELAY_QUEUE)
public void delayConsume(String msg) {
log.debug("[消费者消费信息:{},时间:{}", msg, DateUtil.date());
}
}
package com.example.demoamqp.send;
import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 死信队列
*
* @className: DeadSender
* @projectName: demo-one
* @auth: rosszhang
* @date: 2023/12/28 16:49
*/
@Slf4j
@Component
public class DeadSender {
private AmqpTemplate rabbitAmqpTemplate;
@Autowired
public void setRabbitAmqpTemplate(AmqpTemplate rabbitAmqpTemplate) {
this.rabbitAmqpTemplate = rabbitAmqpTemplate;
}
/**
* 死信队列
*
* @param msg
* @param delayTime
*/
public void sendDelay(String msg, int delayTime) {
rabbitAmqpTemplate.convertAndSend(
Constants.MQ_NORMAL_EXCHANGE,
Constants.MQ_NORMAL_ROUTING_KEY,
msg,
process -> {
process.getMessageProperties().setExpiration(String.valueOf(delayTime));
return process;
}
);
log.debug("[生产者:]发送消息:{},时间{},延迟{}秒", msg, DateUtil.date(), delayTime / 1000);
}
}
源码大家可以看我Gitte地址
Gitte仓库地址
下面这个是我的个人公共号 只会写Bug的程序猿,大家可以关注一下,一键三连。相互交流学习。