/** * 使用rabbitMQ * 1.引用amqp场景 RabbitAutoConfiguration就会自动生效 * 2.给容器中自动配置了各种api RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate * 所有属性都是 spring.rabbitmq开头 * 3.通过注解@EnableRabbit使用 * 4.监听消息 使用@RabbitListener 注解 必须有@EnableRabbit才能生效 如果是创建交换机,创建队列 不需要有@EnableRabbit注解 * @RabbitListener 可以标在类和方法上 * @RabbitHandler 可以标在方法上 场景 一个队列返回的类型不同 使用这个注解来重载 */
<!-- mq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#虚拟主机
spring.rabbitmq.virtual-host=/
#开启消息生产者发送消息确认
# NONE:禁用发布确认模式,是默认值
#CORRELATED:发布消息成功到交换器后会触发回调方法
#SIMPLE
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调这个
spring.rabbitmq.template.mandatory=true
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Autowired
AmqpAdmin amqpAdmin;
操作创建交换机,创建队列,创建绑定关系
/**
* 1.如何创建Exchange Queue Binding? 使用AmqpAdmin进行创建
* 2.如何接收消息
*/
@Test
public void createExchange(){
//创建一个交换机
//String name 交换机名称, boolean durable 是否持久化, boolean autoDelete 是否自动删除
DirectExchange directExchange = new DirectExchange("hallo-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
System.out.println("单点交换机创建成功"+directExchange);
}
@Test
public void createQueue(){
//String name 队列名称, boolean durable 是否持久化, boolean exclusive 是否排他,
// boolean autoDelete 是否自动删除, @Nullable Map<String, Object> arguments
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
}
/**
* 测试创建绑定关系
*/
@Test
public void createBing(){
//String destination, 目的地
// DestinationType destinationType,目的地类型
//String exchange 交换机
// String exchange, String routingKey, 路由key
// @Nullable Map<String, Object> arguments 自定义参数
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hallo-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
}
/ 用来发送消息的
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 测试发送消息功能
*/
@Test
public void sendMessageTest(){
for(int i=0;i<10;i++){
if(i%2 == 0){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("哈哈");
//发送消息 如果发送的消息是个对象,我们会使用序列化机制,将对象写出去 对象必须实现 Serializable
//也可以用config的方法 将对象类型的消息转为json
//String exchange 发送的交换机, String routingKey 路由key, Object object 发送的内容,
// correlationData 指定uuid,生产者发送消息给服务时候,获得这个参数,从而确定是哪条消息
rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",reasonEntity,
new CorrelationData(UUID.randomUUID().toString()));
}else{
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",orderEntity,
new CorrelationData(UUID.randomUUID().toString()));
}
}
}
接收消息在业务层操作 相关的注解有
@RabbitListener 可以标在类和方法上
@RabbitHandler 可以标在方法上 场景 一个队列返回的类型不同 使用这个注解来重载
@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
/**
* 测试接收消息
* 注解中的queues 声明需要监听的队列,可以是多个
* org.springframework.amqp.core.Message 第一个参数是原生的详细信息
* OrderReturnApplyEntity 第二个参数 消息的返回类型
* Channel 第三个参数 服务获取消息的通道
*
* 场景
* 多个客户端启动时,一个消息只能被一个客户端接收 轮询制度
* 多个消息等待服务处理时,处理完一个后,才会接收下一个
*
* 设置ack配置 确认前是unack状态 若宕机或其他原因失败,系统再次重启状态为ready 手动签收后消息才被处理掉
*
*/
@RabbitHandler
public void recieveMessage(Message message, OrderReturnApplyEntity returnApplyEntity,
Channel channel) throws IOException {
//获取消息体
byte[] body = message.getBody();
//获取消息头
MessageProperties messageProperties = message.getMessageProperties();
System.out.println(returnApplyEntity);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//channel内按照顺自增的
if(deliveryTag%2 == 0){
//当业务流程执行完后 手动签收消息 deliveryTag 签收的消息序号, b 是否批量签收
channel.basicAck(deliveryTag,false);
}else{
//拒签 拒签的序号,是否批量操作,是否归队 若false则丢弃 若ture则归队
channel.basicNack(deliveryTag,false,false);
// 和上面作用一样 只是不能设置批量操作这个参数
// channel.basicReject(deliveryTag,false);
}
}
@RabbitHandler
public void recieveMessage2(Message message, OrderEntity order,
Channel channel){
//获取消息体
byte[] body = message.getBody();
//获取消息头
MessageProperties messageProperties = message.getMessageProperties();
System.out.println(order);
}
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
*/
@PostConstruct //MyRabbitConfig对象创建完成后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
//1.生产者发送消息到服务这一步的确认
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//CorrelationData correlationData, 当前消息的唯一关联数据(可以理解为id)
// boolean b, 判断消息是否成功收到
// String s 如果失败,显示失败原因
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("correlationData==="+correlationData+"b==="+b+"s==="+s);
}
});
//2.设置消息没有投递给指定的队列,就触发这个失败的回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//获得投递失败的消息的详细信息
Message message = returnedMessage.getMessage();
//获得投递消息失败的交换机
String exchange = returnedMessage.getExchange();
//获得投递消息失败的状态码
int replyCode = returnedMessage.getReplyCode();
//获得投递消息失败的文本
String replyText = returnedMessage.getReplyText();
//获得投递消息失败的路由key
String routingKey = returnedMessage.getRoutingKey();
}
});
//3.消费端确认(保证每个消息 被正确消费,此时才可以删除这个消息)
}
保证消息可靠投递
生产者到交换机 使用confirmCallback; 得到消息投递的结果
交换机到队列 使用 returnCallback; 如果消息接收失败,将会触发,得到失败消息的信息
队列到消费者 使用ack机制;进行手动确认
1.点对点模式,如果路由key和绑定关系完全匹配,交换机才能收到;
2.主题订阅模式,路由key和绑定关系以单词维度匹配,路由key中可以用? “#”代表匹配多个或0个
“*” 代表匹配一个
3.广播模式,不受路由key局限,只要交换机和队列有绑定关系,就可以收到消息;
4.helder性能比较低,一般不适用,私信队列不太熟悉