? ? ? ? 在一些场景,项目已发布了一段时间了,只是需要完善或优化一些功能要用到队列,但不想改动太大(或者不想在安装第三方MQ组件框架)的情况下可以用redis实现队列 。
? 使用redis的list的数据类型轻松实现有序队列,该队列每次存储时放在左边第1个,从右边最后一个取出即先进去先出来,该队列大多数场景都适用,如果不知道使用什么队列好可以先尝试试用该队列。
/**
* REDIS有序队列
*/
public void pushOrder(String json) {
stringRedisTemplate.opsForList().leftPush(KEY, json);
}
/**
* REDIS有序队列
*/
public String popOrder() {
return stringRedisTemplate.opsForList().rightPop(KEY);
}
?假设分布式服务下的另外的springboot项目消费队列示例代码:
@Service
public class RedisOrderProcess {
private static final Logger logger = LogManager.getLogger(RedisOrderProcess.class);
@Autowired
RedisServiceImpl redisService;
@PostConstruct
public void init() {
int cpuCount = Runtime.getRuntime().availableProcessors();
System.out.println("start RedisOrderProcess > cpus="+cpuCount);
processOrderImport(cpuCount);
}
private void processOrderImport(int cpus) {
ExecutorService executorService = new ThreadPoolExecutor(cpus, 1000,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue <Runnable>());
executorService.execute(() -> {
while (true) {
String json=null;
try{
json=redisService.popOrder();
if(json!=null){
//订单业务处理代码
}else{
Thread.sleep(500);
}
}catch (Exception e){
logger.error("err data> \n"+json,e);
e.printStackTrace();
}
}
});
}
}
使用redis的hash的数据类型轻松实现无序队列,而且队列保存中的内容是唯一的,往往用到的一些场景很特殊,而且在某些场景运用比正规的队列组件还爽。
?在使用springboot框架下轻松实现生产端:
/**
* 推送第三方设备状态
*
* @param id
* @param status
*/
public void pushStatus(String id, Integer status) {
if(status==null){
return;
}
String value = id+ ":" + status;
stringRedisTemplate.opsForSet().add("PUSH_STATUS", value);
}
?在使用springboot框架下轻松实现消费端:
@Service
public class RedisStatusProcess {
private static final Logger logger = LogManager.getLogger(RedisStatusProcess.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
@PostConstruct
public void init() {
int cpuCount = Runtime.getRuntime().availableProcessors();
System.out.println("start RedisStatusProcess > cpus="+cpuCount);
processStatusImport(cpuCount);
}
private void procesStatusImport(int cpus) {
ExecutorService executorService = new ThreadPoolExecutor(cpus, 1000,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue <Runnable>());
executorService.execute(() -> {
while (true) {
String json=null;
try{
json=stringRedisTemplate.opsForSet().pop("PUSH_STATUS")
if(json!=null){
//状态业务处理代码
}else{
Thread.sleep(200);
}
}catch (Exception e){
logger.error("err data> \n"+json,e);
e.printStackTrace();
}
}
});
}
}
? ? ? ?以上代码在我们的线上场景:我们当前2000个设备,每个设备每秒上传1个状态,如果使用有序队列就要 1分钟时间就要处理2000*60个状态,如果队列有延迟还意味着从队列获取的状态不是最新的,而且没有消费时可能会撑破内存。而使用无序队列就不存在问题,因为在队列中每个设备只允许一个状态,如果未消费则直接用最新的状态覆盖,拿出的基本都是最新的,也不用一定要消费到每个队列状态。?
安装与使用redis教程 ->?http://t.csdnimg.cn/1Ltzm