CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。
使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
Controller:
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@Resource
private CountDownService countDownService;
/**
* CountDownLatch实现异步多线程不同业务处理,不同service,异常情况回滚全部子线程
*
* @return
*/
@GetMapping("/countDown/handleDataBack")
public String countDownHandleDataBack() {
countDownService.handleDataBack();
return "success";
}
Sevice:
@Service
@Slf4j
public class CountDownService {
@Resource
private TestMapper testMapper;
@Resource
private ApplicationContext applicationContext;
/**
* 主线程,同时调用多个个子线程进行业务处理,当其中一个子线程出现异常,则全部子线程进行回滚
*/
@Transactional(rollbackFor = Exception.class)
public void handleDataBack() {
AtomicBoolean errorTag = new AtomicBoolean(false);
// 设置countDown大小,与异步执行的业务数量一样,比如2个
CountDownLatch countDownLatch = new CountDownLatch(2);
// 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚
CountDownLatch errorCountDown = new CountDownLatch(1);
// 异步调用其他Service,执行业务处理
CountDownService bean = applicationContext.getBean(CountDownService.class);
bean.handleTestOne(countDownLatch, errorCountDown, errorTag);
bean.handleTestTwo(countDownLatch, errorCountDown, errorTag);
try {
// 主线程阻塞
countDownLatch.await();
// 可以设置最大阻塞时间,防止线程一直挂起
/*boolean await = countDownLatch.await(1, TimeUnit.SECONDS);
if (!await) {
// 超过时间子线程都还没有结束,直接都回滚
errorTag.set(true);
}*/
log.info("继续执行主线程");
// 继续执行后续的操作,比如insert、update等
TestEntity entity = new TestEntity();
entity.setId(new Random().nextInt(999999999));
entity.setCount(1);
entity.setCommodityCode("handleTestMain");
entity.setMoney(new Random().nextInt(1000000));
entity.setUserId("user-handleTestMain");
testMapper.insert(entity);
} catch (Exception e) {
log.error("主线程业务执行异常");
errorTag.set(true);
} finally {
// 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中
errorCountDown.countDown();
}
// 如果出现异常
if (errorTag.get()) {
throw new RuntimeException("异步业务执行出现异常");
}
log.info("主线程执行完成");
}
/**
* 子线程具体业务处理
*/
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@Async
public void handleTestOne(CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
log.info("开始执行handleTestOne线程");
// 模拟业务耗时
ThreadUtil.sleep(2000);
try {
// 执行数据库操作
TestEntity entity = new TestEntity();
entity.setId(new Random().nextInt(999999999));
entity.setCount(1);
entity.setCommodityCode("handleTestOne");
entity.setMoney(new Random().nextInt(1000000));
entity.setUserId("user-handleTestOne");
testMapper.insert(entity);
// 模拟出现异常
int a = 1 / 0;
} catch (Exception e) {
errorTag.set(true);
}
// 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
countDownLatch.countDown();
// 子阻塞,直到其他子线程完成操作
try {
errorCountDown.await();
} catch (Exception e) {
errorTag.set(true);
}
log.info("handleTestOne-子线程执行完成");
if (errorTag.get()) {
// 抛出异常,回滚数据
throw new RuntimeException("handleTestOne-子线程业务执行异常");
}
}
/**
* 子线程具体业务处理
*/
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@Async
public void handleTestTwo(CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
log.info("开始执行handleTestTwo线程");
// 模拟业务耗时
ThreadUtil.sleep(500);
try {
// 执行数据库操作
TestEntity entity = new TestEntity();
entity.setId(new Random().nextInt(999999999));
entity.setCount(1);
entity.setCommodityCode("handleTestTwo");
entity.setMoney(new Random().nextInt(1000000));
entity.setUserId("user-handleTestTwo");
testMapper.insert(entity);
} catch (Exception e) {
errorTag.set(true);
}
// 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
countDownLatch.countDown();
// 子阻塞,直到其他子线程完成操作
try {
errorCountDown.await();
} catch (Exception e) {
errorTag.set(true);
}
log.info("handleTestTwo-子线程执行完成");
if (errorTag.get()) {
// 抛出异常,回滚数据
throw new RuntimeException("handleTestTwo-子线程业务执行异常");
}
}
}