????异步调用方式就是基于消息通知的方式,一般包含了三个角色。
????有了异步调用,发送者和接受者之间就解耦了,双方不必知道对方的存在,也不必等待支付的结果。
????就比如外卖柜,外卖小哥把外卖放到外卖柜,然后他就可以走了,他不必知道谁来取,而我也不知道谁送的,只是取走外卖就行,这就是解耦。
但是异步调用也会存在以下问题
所以异步调用不能乱用,只有对对方的执行结果不关心的流程,才可以使用异步。就比如查询就不能使用异步,点击查询按钮是要立刻查出内容的。
上篇文章讲过docker,直接在docker里安装,几条条命令搞定(docker是真方便)
????首先,直接拉包,docker pull rabbitmq:3-management
,然后run。
docker run \
-e RABBIT_DEFAULT_USER=wangmq \ (环境变量,默认用户名)
-e RABBIT_DEFAULT_PASS=123456 \ (设置密码)
-v mq-plugins:/plugins \ (挂载mq-plugins数据卷,对应mq里的插件目录)
--name mq \ (容器名)
--hostname mq \ (主机名)
-p 15672:15672 \ (端口映射)
-p 5672:5672 \
--network wang \ (网络)
-d \ (后台加载)
rabbitmq:3.8-management
然后docker ps
查看,启动成功,其中15672是它提供的控制台端口(图形化界面),5672是消息通知端口,发送消息用的,所以访问5672端口。
????一般的流程为,消息发送到mq,mq转给consumer,但是中间是有层队列的,consumer是肯定要跟queue绑定,而消息也不是直接发到队列,先发到交换机,然后交换机负责把消息路由给队列。所以整个消息发送的模型为==生产者发送消息到交换机,交换机把消息路由给队列,消费者监听队列,拿到消息。==除去publisher和consumer,中间的部门就是Broker(消息代理)。一个Broker是可以创建多个VirtualHost,起到数据隔离作用,交换机和队列都有属于自己的VirtualHost。
????首先,要在Java中使用Mq,我们要使用Spring提供的AMQP,AMQP是用于在应用程序之间传递业务的开放标准。该协议与语言、平台无关,更符合微服务中的独立性要求。
????Spring AMQP是基于AMQP的一套API规范,提供了模板来发送和接收信息。包含了两部分,spring-amqp是基础抽象,spring-rabbit是底层默认实现
代码
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.150.101 #主机
port: 5672 #端口
virtual-host: /mq
username: wangmq
password: 123456
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimplequeue(){
//队列名称
String queueName = "simple.queue";
//消息
String message = "hello,mq";
//发送消息
rabbitTemplate.converAndSend(queueName,message);
}
@Slf4j
@Component
public class TestMq {
@RabbitListener(queue = "simple.queue")
public void listen(String msg){
System.out.println("消费者收到了simple.queue的消息:"+ msg);
}
}
????默认情况下,RabbitMq会将消息一次轮询投递给绑定在队列上的每一个消费者。但是这没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。
????因此需要修改配置,将preFetch值改为1,确保同一时刻最多投递给消费者一条消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息
????生产环境都是会通过交换机(exchange)来发送消息,而不是直接发送到队列,交换机的类型有三种
Fanout:广播
Direct:定向
Topic:话题
????Fanout exchange会将接收到的消息广播到每一个跟它绑定的消息队列,所以也叫广播模式。
????Direct Exchange会将接收到的信息根据规则路由到指定的Queue,因此成为定向路由。
????Topic Exchange与Direct Exchange类似,区别在于Routing Key可以是多个单词的列表,并且以.
分割
????Queue与Exchange指定Bindging Key时可以使用通配符:
#
:代指0个或多个单词
*
:代指一个单词
????SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系:
????mq如果发送的消息不是message类型,那么会对消息进行序列化处理,Spring对消息的处理是由org.springframrwork.amqp.support.converter.MessageConverter
处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
????会有安全问题隐患
所以建议 采用JSON序列化,在生产者和消费者的pom都要引入jackson依赖
<!-- jackson序列化依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
然后在启动类声明序列化方式
@Bean
public MessageConverter jacksonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
????拿订单流程举例,用户下单后调用订单业务代码,同步执行余额扣减和支付状态更新操作,然后使用mq异步处理更新订单状态流程,期间会先调用消息代理(mq),然后进行交易服务,最后更新订单状态。这期间每一步都有可能因为mq的宕机而出bug,所以mq的可靠性是一个注意点。消息丢失的可能性主要有:
????有时候由于网络的波动,可能会出现客户端连接MQ失败的情况,这时候可以通过配置开启失败后的重连机制。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重连机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后的下次等待时长倍数,=initial-interval * multiplier
max-attempts: 3 #最大重试次数