目录
????????MQ是消息队列(MessageQueue)
RabbitMQ | ActiveMQ | RocketMQ | KafKa | |
社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Java&Scala |
协议支持 | AMQP XMPP STOPM | OpenWrite STOMP REST XMPP AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒 | 毫秒 | 毫秒 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
?主要角色:生产者、交换机、队列、绑定关系、虚拟主机、消费者。
生产者:产生消息,发送到交换机或者队列(基本模型)
交换机:接收生产者的消息,并且将消息路由到相关的队列。
队列:接收交换机或生产的发送的消息,并且负责未消费消息的存储。
绑定关系:将交换机和相关队列绑定起来,形成路由条件。
虚拟主机:类似命名空间,起到隔离和分组的作用。
消费者:从队列取出消息,进行消费。
优点:
缺点:
基于队列完成,生产者和消费者一对一
基于队列完成,一个生产者对应多个消费者,生产者能力很强,消费者能力较弱。
X为交换机,生产者将消息发送到交换机,交换机负责接收消息和将消息广播到所有队列。
通过交换机,将消息路由到不同的队列,交换机类型为:direct
交换机通过通配符识别,将消息转发到不同的队列。
看另外一篇博客
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: ip地址
port: 5672 # 端口
username: root
password: root123
virtual-host: / #主机
@GetMapping("push1")
public String push1(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
String queueName= "qa.message.testQueue";
//如果queueName队列不存在,创建队列
if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
org.springframework.amqp.core.Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
}
rabbitTemplate.convertAndSend(queueName, "hello rabbitMQ");
return "success";
}
准备一个类交给Spring管理
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "qa.message.testQueue")
public void receive(String message) {
System.out.println("接收消息:" + message);
}
这个类应该是在另外一个moudle中,另外的moudel也是需要依赖和配置文件的,和前面的一样就行,启动这个module:
启动之后消息立马就被消费了,
@RabbitListener(queues = "qa.message.testQueue")里面写的是监听队列的名称。
查看控制台,消息被消费了:
? ? 生产者多生产一点消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/push")
public String push(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
String queueName= "testWorkQueue";
//如果queueName队列不存在,创建队列
if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
org.springframework.amqp.core.Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
}
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, "workQueue"+i);
}
return "success";
}
??可以写多个消费者来消费
@RabbitListener(queues = "testWorkQueue")
public void receiveWorkQueue1(String message) {
System.out.println("receiveWorkQueue1消费:" + message+ LocalDateTime.now());
}
@RabbitListener(queues = "testWorkQueue")
public void receiveWorkQueue2(String message) {
System.out.println("receiveWorkQueue2消费:" + message+ LocalDateTime.now());
}
?结果展示
?配置一个交换机,绑定两个队列
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 声明队列
* @return
*/
@Bean
public Queue fanoutQueue() {
return new Queue("fanoutQueue");
}
/**
* 将队列和交换机绑定
* @param queue
* @param fanoutExchange
* @return
*/
@Bean
public Binding binding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue1() {
return new Queue("fanoutQueue1");
}
/**
* 将队列和交换机绑定
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}
发送消息,声明交换机名称和消息:
@GetMapping("push2")
public String push2(){
String fanoutExchange = "fanoutExchange";
rabbitTemplate.convertAndSend(fanoutExchange, "","hello rabbitMQ");
return "success";
}
队列监听:
@RabbitListener(queues = "fanoutQueue1")
public void fanoutQueue1(String message) {
System.out.println("fanoutQueue1消费:" + message+ LocalDateTime.now());
}
@RabbitListener(queues = "fanoutQueue")
public void fanoutQueue(String message) {
System.out.println("fanoutQueue消费:" + message+ LocalDateTime.now());
}
?结果,两个都拿到了函数
绑定队列和交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "testDirectWorkQueue"),
exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.DIRECT),
key = {"A","B"})
)
public void testDirectWorkQueue(String message) {
System.out.println("testDirectWorkQueue接收消息:" + message + " ");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "testDirectWorkQueue1"),
exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.DIRECT),
key = {"A","C"})
)
public void testDirectWorkQueue1(String message) {
System.out.println("testDirectWorkQueue1接收消息:" + message );
}
路由,这里发送消息过去需要你指定路由的key,只有当队列里面有相应地key时,才会将消息路由到队列,例如:Key=A时,两个队列都有消息,和上面的广播一样;当Key=C时,只会路由到testDirectWorkQueue1这个队列。
@GetMapping("push3")
public String push3(){
String directExchange = "directExchange";
rabbitTemplate.convertAndSend(directExchange, "A","hello directExchange");
return "success";
}
@GetMapping("push4")
public String push4(){
String directExchange = "directExchange";
rabbitTemplate.convertAndSend(directExchange, "C","hello directExchange");
return "success";
}
实现了用通配符区分和路由消息:
#:代表0个或多个单词
*:代表一个单词
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "testTopicWorkQueue"),
exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
key = "cq.#")
)
public void testTopicWorkQueue(String message) {
System.out.println("testTopicWorkQueue接收消息:" + message );
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "testTopicWorkQueue1"),
exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
key = "*.hg")
)
public void testTopicWorkQueue1(String message) {
System.out.println("testTopicWorkQueue1接收消息:" + message );
}
返送消息:
前后缀都满足:
@GetMapping("push5")
public String push5(){
String topicExchange = "topicExchange";
rabbitTemplate.convertAndSend(topicExchange, "cq.hg","两个都有");
return "success";
}
?满足后缀:
@GetMapping("push6")
public String push6(){
String topicExchange = "topicExchange";
rabbitTemplate.convertAndSend(topicExchange, "bj.hg","北京火锅吃芝麻酱");
return "success";
}
?满足前缀:
@GetMapping("push7")
public String push7(){
String topicExchange = "topicExchange";
rabbitTemplate.convertAndSend(topicExchange, "cq.cc","重庆串串也好吃");
return "success";
}
当传递的消息是一个对象时,比如:Map
@GetMapping("push8")
public String push8(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
String queueName= "formatQueue";
//如果queueName队列不存在,创建队列
if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
org.springframework.amqp.core.Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
}
Map<String,Object> map = new HashMap<>();
map.put("name","张三");
map.put("age",20);
rabbitTemplate.convertAndSend(queueName, map);
return "success";
}
此时,队列里面存的信息为:
通过序列化存储的,性能不高,也不优雅,这里可以及转换为JSON传输。
引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
创建配置类,加载Bean:
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
再试一次:
?发送的消息转换成功了,只需要发送的时候转换就可以了,接收不需要转化。