日常开发后端接口时,总是会遇到一些和业务关联性不是很大却又很耗时的操作,由于功能的重要性和体量远达不到要上消息中间件的情况,这时候我们就可以实现一个简单的生产者消费者模型来实现异步消费。就以笔者这篇文章为例,通过JUC包下的阻塞队列实现了一个简单的并发同步模型。
本文整体结构如下,通过笔者的代码示例,你会对生产者消费者这种并发同步的设计模式的开发模型和使用场景有着更进一步的理解。
简单来说生产者消费者模型就是让多线程去异步消费生产者的任务,对于web开发而言,我们的生产者可以是任意的HTTP请求,这些HTTP请求会将一些耗时操作提交到队列中让消费者进行消费(这里消费者可以是一个异步的线程或者线程池,具体看读者的业务场景)
所以我们的实现思路如下:
这里笔者假设耗时的操作是对一个第三方接口的请求,所以笔者在封装任务时,只需在任务中声明调第三方接口的参数即可:
/**
* 要被执行的任务
*/
@Data
public class Task {
/**
* 任务id
*/
private Long id;
/**
* 任务名称
*/
private String taskName;
/**
* 请求参数
*/
private JSONObject params;
/**
* 创建时间
*/
private DateTime createTime;
/**
* 结束时间
*/
private DateTime finishTime;
}
为了方便管理,我们将阻塞队列以聚合的方式封装一个QueueBean交给Spring进行管理,注意笔者这里声明的阻塞队列的容量为2000仅仅是示例,具体数值读者需要结合压测进行调整,参考StackOverflow的回答一般建议设置为可分配的堆内存大小除以对象平均字节数:
Make it “as large as is reasonable”. For example, if you are OK with it consuming up to 1Gb of memory, then allocate its size to be 1Gb divided by the average number of bytes of the objects in the queue.
@Component
@Slf4j
public class QueueBean {
private BlockingQueue<Task> blockingQueue = new ArrayBlockingQueue<>(2000);
@SneakyThrows
public void put(Task task) {
blockingQueue.put(task);
}
@SneakyThrows
public Task take() {
return blockingQueue.take();
}
}
我们的HTTP请求就是一个生产者,所以在业务执行过程中,笔者将耗时的三方请求封装为Task提交到阻塞队列中交由消费者异步消费:
@Autowired
private QueueBean queueBean;
@PostMapping("/submitTask")
public String submitTask() {
Task task = new Task();
long id = snowflake.nextId();
task.setId(id);
task.setTaskName("任务-" + id);
task.setParams(new JSONObject().putOnce("userName", RandomUtil.randomString(5)));
task.setCreateTime(new DateTime());
task.setFinishTime(new DateTime());
log.info("提交任务:{}", JSONUtil.toJsonStr(task));
queueBean.put(task);
return "success";
}
因为消费者的执行逻辑需要提交到线程池中让池中的线程进行处理,所以我们这里封装了一个消费的Runnable ,因为这个Runnable不受Spring容器管理,所以获取Spring容器中的队列可以采用hutool封装的SpringUtil上下文,当然如果了解Spring扩展点的读者也可以采用ApplicationContext获取阻塞队列,而笔者对于任务消费逻辑比较简单,仅仅打印一下任务信息:
@Slf4j
public class ConsumerTask implements Runnable {
@Override
public void run() {
QueueBean queueBean = SpringUtil.getBean(QueueBean.class);
while (true) {
//从阻塞队列中获取任务
Task task = queueBean.take();
log.info("消费者消费任务,任务详情:{}", JSONUtil.toJsonStr(task));
}
}
}
完成消费者的封装之后,我们采用线程池的方式创建线程来执行消费者的逻辑,可以看到笔者采用Spring后置的扩展点,确保在线程池的Bean完成加载之后对线程池进行初始化,并提交5个消费者。
private static ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create()
.setCorePoolSize(Runtime.getRuntime().availableProcessors())
.setMaxPoolSize(Runtime.getRuntime().availableProcessors() << 2)
.setThreadFactory(new NamedThreadFactory("consumerTask-", false))
.build();
@PostConstruct
private void init() {
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(new ConsumerTask());
}
}
我们将应用启动后可以看到下面这段输出,不难看出每当我们的HTTP请求提交一个任务到队列中后,总有一个线程池中的线程出来消费者任务,两者高效并发同步的同时又能保证线程安全:
2024-01-02 15:20:31.328 INFO 12084 --- [nio-8080-exec-8] c.s.q.controller.BasicController : 提交任务:{"id":1742083432578711552,"taskName":"任务-1742083432578711552","params":{"userName":"9b53q"},"createTime":1704180031328,"finishTime":1704180031328}
2024-01-02 15:20:31.329 INFO 12084 --- [ consumerTask-1] c.s.queueSync.task.ConsumerTask : 消费者消费任务,任务详情:{"id":1742083432578711552,"taskName":"任务-1742083432578711552","params":{"userName":"9b53q"},"createTime":1704180031328,"finishTime":1704180031328}
2024-01-02 15:20:31.934 INFO 12084 --- [io-8080-exec-10] c.s.q.controller.BasicController : 提交任务:{"id":1742083435116265472,"taskName":"任务-1742083435116265472","params":{"userName":"xwoth"},"createTime":1704180031933,"finishTime":1704180031933}
2024-01-02 15:20:31.934 INFO 12084 --- [ consumerTask-3] c.s.queueSync.task.ConsumerTask : 消费者消费任务,任务详情:{"id":1742083435116265472,"taskName":"任务-1742083435116265472","params":{"userName":"xwoth"},"createTime":1704180031933,"finishTime":1704180031933}
2024-01-02 15:20:32.623 INFO 12084 --- [nio-8080-exec-1] c.s.q.controller.BasicController : 提交任务:{"id":1742083438010335232,"taskName":"任务-1742083438010335232","params":{"userName":"2udas"},"createTime":1704180032623,"finishTime":1704180032623}
2024-01-02 15:20:32.624 INFO 12084 --- [ consumerTask-2] c.s.queueSync.task.ConsumerTask : 消费者消费任务,任务详情:{"id":1742083438010335232,"taskName":"任务-1742083438010335232","params":{"userName":"2udas"},"createTime":1704180032623,"finishTime":1704180032623}
笔者在这里仅仅是实现了一个比较简单的并发同步模型,该模型只能算是一个比较实用的示例版本,后续笔者会考虑补充异步消费失败等兜底策略,感兴趣的读者可以点点关注。
我是sharkchili,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号:
写代码的SharkChili,同时我的公众号也有我精心整理的并发编程、JVM、MySQL数据库个人专栏导航。
SpringBoot —— 基于BlockingQueue实现的生产者消费者用例:https://zealon.cn/article/23
How to estimate or calculate the size of the ArrayBlockingQueue:https://stackoverflow.com/questions/7958567/how-to-estimate-or-calculate-the-size-of-the-arrayblockingqueue