如果还没有学习Rabbit
的建议去学一下我的另一篇RabbitMq的使用再来看这个实际的使用`
**当前问题:**处理时间超时导致前端页面响应超时是一个比较常见的问题。这可能由于后端执行任务时间过长、网络延迟、资源不足等原因引起。
解决的思路有:
解决方法 | 步骤 | 缺点 |
---|---|---|
优化后端代码 | 检查后端代码,看是否有可以优化的地方。可能存在一些复杂、低效或者重复的操作,通过优化这些代码可以提升后端执行效率。 | 优化的效果不明显,当数据包的处理时间太长的时候处理时间是无法缩短的 |
设置合理的超时时间 | 在前端与后端的通信中,设置合理的超时时间。如果后端处理任务时间较长,适当增加前端请求的超时时间,但也要避免设置过长的超时时间。 | 前端响应时间太久,造成前端页面的假死 |
优化数据库的查询 | 如果后端涉及数据库查询,确保数据库表的索引设置得当,优化查询语句,以提高查询效率。 | 能解决查询的问题,但是数据处理的时间是解决不了的 |
异步处理+轮询查询 | 如果后端执行的任务比较耗时,考虑使用异步处理方式。将一些耗时的任务交给异步任务队列或线程池来处理,从而不阻塞主线程。并且添加一个轮询查询的接口,轮询查询处理的情况。 |
当我们的处理的耗时非常久的时候,前端一直在等待加载,会造成假死的状态,并且有可能会断开连接。
当异步处理的时候直接将消息丢入到消息队列中,挨个的由消费者进行处理。处理的时候使用Redis
根据时间戳来记录每一个处理的状态,最后再用一个轮询的接口进行查询。
<!-- Spring Boot 项目中更方便地集成 AMQP 的一种方式-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 普通的spring项目的mq客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!--开启web客户端,方便进行调用演示-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- redis用来做轮询查询接口-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
rabbitmq:
addresses: 192.168.1.110
port: 5672
username: rabbit
password: rabbit
virtual-host: /test
#开启消息回执
publisher-confirm-type: correlated
publisher-returns: true
# 设置消息的确认模式,默认的模式为auto自动确认,manual手动确认,所以需要channel去手动确认
listener:
simple:
acknowledge-mode: manual
redis:
#Redis服务器地址
host: localhost
#端口
port: 6379
#使用几号数据库
database: 0
@Configuration
public class RabbitMqConfig {
// 配置交换机
@Bean
public FanoutExchange buildFanoutExchange() {
return ExchangeBuilder
.fanoutExchange("fanoutExchange")
.build();
}
// 配置消息队列
@Bean
public Queue buildFanoutQueue() {
return QueueBuilder
.durable("fanoutQueue")
.build();
}
// 绑定交换机和消息队列
@Bean
public Binding buildFanoutBinding() {
return BindingBuilder
.bind(buildFanoutQueue())
.to(buildFanoutExchange());
}
}
这里我使用的是fanout
(扇出)型交换机,这时一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。使用此类型
消息队列中的数据,如果迟迟没有消费者来处理,那么就会一直占用消息队列的空间。比如我们模拟一下抢车票的场景,用户下单高铁票之后,会进行抢座,然后再进行付款,但是如果用户下单之后并没有及时的付款,这张票不可能一直让这个用户占用着,因为你不买别人还要买呢,所以会在一段时间后超时,让这张票可以继续被其他人购买。
这时,我们就可以使用死信队列,将那些用户超时未付款的或是用户主动取消的订单,进行进一步的处理,以下类型的消息都会被判定为死信:
@Configuration
public class DeadMqConfig {
// 死信交换机
@Bean
public Exchange dmExchange() {
return ExchangeBuilder
.fanoutExchange("dm.direct")
.build();
}
// 死信队列
@Bean
public Queue dmQueue() {
return QueueBuilder
.nonDurable("dm-queue") // 消息队列的名称
.build();
}
@Bean
public Binding dmBinding() {
return BindingBuilder
.bind(dmQueue())
.to(dmExchange())
.with("dm-routing") // 交换机和消息队列的路由对应关系
.noargs();
}
}
配置了Object和String的序列化方式
@Configuration
public class RedisConfig {
//编写自定义redisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//String的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
//hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
//value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
//hash的value的序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
创建redis工具类,进行快速的进行数据存储和读取,最后用作轮询查询时候的标识处理数据的状态
@Component
public class RedisUtils {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
创建消费者(Listener)对消息队列进行监控,监听到消息以后进行消费,数据处理的各个阶段在Redis
存储处理的状态,方便轮询查询。
注意:Redis需要设置过期时间(一周),防止内存泄漏
@Service
@Slf4j
public class MqConsumer {
@Autowired
RedisUtils redisUtils;
/**
* @param channel 用于确保消息被正确的处理
* @param timeStamp 使用时间戳标识不同的消息请求,方便后期轮询查询
* @param tag 每一个消息的独一无二的tag
*/
@RabbitListener(queues = "fanoutQueue")
public void BuidConsumer(Channel channel, String timeStamp, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
redisUtils.set(timeStamp, "消息开始处理......");
log.info("开始处理编号为: " + timeStamp + " 的消息了");
dealData(timeStamp);
channel.basicAck(tag, false); // 确认消息
log.info("理编号为: " + timeStamp + " 的消息处理完成");
} catch (Exception e) {
try {
log.info(e.getMessage(), e);
channel.basicNack(tag, false, false); // 丢入死信队列
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
@RabbitListener(queues = "dm-queue")
public void buildDeadConsumer(Channel channel, String timeStamp, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
log.info(timeStamp + "进入处理异常"); // 日志记录处理异常进入死信队列的消息
channel.basicAck(tag, false);
redisUtils.set(timeStamp, "异常处理完成 ...."); // 消息队列直接处理完成
} catch (IOException e) {
e.printStackTrace();
}
}
// 耗时处理数据的方法
public void dealData(String timeStamp) {
redisUtils.set(timeStamp, "消息正在处理 ....");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
redisUtils.set(timeStamp, "消息处理发生异常");
e.printStackTrace();
}
redisUtils.set(timeStamp, "消息处理完成....", 7*24*60*60L); // 设置过期时间,防止内存泄漏
}
}
服务层直接把处理的消息放入交换机中,并且返回一个时间戳用来作为任务的唯一标识,用作后面的轮询查询的接口的参数。
**消息回执:**为了确保消息已将被成功的接受并且处理,也保证不会重复的传输数据,一旦消费者成功处理了消息并发送了回执,RabbitMQ 就知道这个消息已经被正确处理。这有助于防止消息被重复处理,确保消费者不会在某些故障情况下多次处理同一条消息。
Service层:
@Service
@Slf4j
public class UserService {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void regCallback() {
// 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 如果ack为true代表消息已经收到
if (!ack) {
// 这里可能要进行其他的方式进行存储
log.error("MQ队列应答失败,失败原因是:" + cause.toString());
return;
}
log.info("消息已经成功发送 !");
}
});
}
public String dealData() {
String timeStamp =Long.toString(System.currentTimeMillis());
// 将消息丢进交换机
rabbitTemplate.convertAndSend("fanoutExchange", "", timeStamp, new CorrelationData(timeStamp));
return timeStamp;
}
}
rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制:
实现ConfirmCallback接口
public interface ConfirmCallback {
void confirm(CorrelationData correlationData, boolean ack, String cause);
}
correlationData
:与消息关联的数据,通常是一个唯一标识符。在发送消息时,可以通过 CorrelationData
设置关联数据,然后在确认回调中获取。ack
:表示消息是否被成功确认。如果为 true
,表示消息已成功发送到 Exchange;如果为 false
,表示消息发送失败。cause
:如果消息发送失败,cause
中包含了失败的原因。Controller层:
@RestController()
@RequestMapping("/user")
public class UserController {
@Autowired
UserService userService;
@Autowired
RedisUtils redisUtils;
// 耗时久的处理数据接口
@GetMapping("/deal")
public String dealData() {
return userService.dealData();
}
// 轮询查询接口
@GetMapping("/status/{timeStamp}")
public String getStatus(@PathVariable String timeStamp) {
return redisUtils.get(timeStamp).toString();
}
}
调用开始处理的接口然后调用服务层将消息放入交换机,然后给消费者进行处理。并且返回一个时间戳用作的轮询处理的状态的标识。
@RestController("/user")
public class UserController {
@Autowired
UserService userService;
@Autowired
RedisUtils redisUtils;
@GetMapping("/deal")
public String dealData() {
return userService.dealData();
}
// 根据时间戳进行轮询查询
@GetMapping("/status/{timeStamp)")
public String getStatus(@PathVariable String timeStamp) {
return redisUtils.get(timeStamp).toString();
}
}
下面是我的目录结构,大家可以做一个参考:
开始数据处理接口,接口马上响应成功,返回一个可以轮询查询的时间戳:
根据唯一标识时间戳,轮询查看处理过程:
IDEA控制台界面:连续的访问好几次RabbitMq接口,已经开始排队异步处理
?