上一篇文章记录了 RocketMQ 整体架构、安装部署、应用场景这三个内容。熟悉了 RocketMQ 相关核心概念后,本文记录基于 SpringBoot 整合 RocketMQ 异步发送短信功能,其中会引入阿里云短信服务相关内容。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
rocketmq: # RocketMQ配置
name-server: 192.168.57.129:9876
producer:
group: test-group
平台系统有一键通知按钮,一键通知功能描述:用于发送短信给家长,引导家长去微信公众号关注他们孩子的视力,屈光,眼轴等信息。并且问卷倒计时内完成问卷信息的填写,可解锁未来预测功能,预测视力未来的发展情况。
查看查询当前任务中已经完成筛查的记录,若筛查项目全部无数据就需要过滤掉,并且需要创建学生对应的任务的问卷日期,用来记录问卷倒计时。以及家长需要在微信公众号绑定该学生后才能收到短信通知,没绑定则收不到短信通知。
/**
* 一键通知
*
* @param taskId
* @return
*/
@Login
@RequestMapping("/notice")
public Result notice(@RequestParam(value = "taskId") Integer taskId) {
ScreeningRecordQueryCondition queryCondition = new ScreeningRecordQueryCondition();
queryCondition.setTaskId(taskId);
queryCondition.setIsDelete(YNEnum.N);
queryCondition.setStatus(ScreeningStatus.DONE.getCode());
List<TScreeningRecord> screeningRecords = screeningRecordService.findByCondition(queryCondition);
// 过滤空数据->生成问卷倒计时->发送短信通知
taskService.filter(screeningRecords);
return Result.success();
}
/**
* 过滤空数据->发送通知->生成问卷日期
* @param screeningRecords
*/
public void filter(List<TScreeningRecord> screeningRecords) {
if (CollectionUtils.isEmpty(screeningRecords)) {
return ;
}
List<TScreeningRecord> records = screeningRecords.stream().
// 对于各个筛查项均无数据的学生,即使任务结束此处档案不显示,也不给家长发送通知
filter(tScreeningRecord -> StringUtils.isNotEmpty(tScreeningRecord.getVision()) ||
StringUtils.isNotEmpty(tScreeningRecord.getRefraction()) ||
StringUtils.isNotEmpty(tScreeningRecord.getAxis())
).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(records)) {
// 循环通知
records.forEach(screeningRecord -> screeningRecordHelper.notice(screeningRecord));
}
}
@Slf4j
@Service
public class ScreeningRecordHelper {
@Autowired
ParentRelationService parentRelationService;
@Autowired
ParentService parentService;
@Autowired
StudentService studentService;
@Autowired
GenerateQuestionnaireDateService GenerateQuestionnaireDateService;
@Autowired
SmsService smsService;
@Autowired
ScreeningRecordService screeningRecordService;
@Autowired
MQProducer mqProducer;
/**
* 将筛查记录发送通知给学生家长
*
* @param screeningRecord 筛查记录
*/
public void notice(TScreeningRecord screeningRecord) {
CompletableFuture.runAsync(() -> {
Integer studentId = screeningRecord.getStudentId();
Integer taskId = screeningRecord.getTaskId();
GenerateQuestionnaireDateQueryCondition queryCondition = new GenerateQuestionnaireDateQueryCondition();
queryCondition.setStudentId(studentId);
queryCondition.setTaskId(taskId);
List<TGenerateQuestionnaireDate> generateQuestionnaireDates = GenerateQuestionnaireDateService.findByCondition(queryCondition);
if (CollectionUtils.isEmpty(generateQuestionnaireDates)) {
// 创建生成问卷时间记录
GenerateQuestionnaireDateService.create(taskId, studentId, new Date());
}
// 查找学生对应的家长
List<TParentRelation> parentRelations = parentRelationService.findByBizTypeBizId(ParentRelationBizType.STUDENT.getCode(), String.valueOf(studentId));
if (CollectionUtils.isEmpty(parentRelations)) {
return;
}
TParentRelation relation = parentRelations.get(0);
TParent parent = parentService.findById(relation.getParentId());
// 获取学生信息
TStudent student = studentService.findById(studentId);
// 发送筛查记录通知模板消息
if (student != null) {
// 发送异步消息
ScreeningRecordSendStatusChangeMsg msg = new ScreeningRecordSendStatusChangeMsg();
msg.setRecordId(screeningRecord.getId());
msg.setPhone(parent.getPhone());
msg.setStudentName(student.getName());
msg.setOldSendStatus(screeningRecord.getSendStatus().intValue());
msg.setNewSendStatus(YNEnum.Y.getCode());
mqProducer.sendAsync(Topic.SCREENING_RECORD_SEND_STATUS_CHANGE.getTopic(), Topic.SCREENING_RECORD_SEND_STATUS_CHANGE.getTag(), JSON.toJSONString(msg));
}
});
}
}
@Service
public class MQProducer {
private static final Logger log = LoggerFactory.getLogger(MQProducer.class);
/**
* 默认发送超时时间, 3000毫秒
*/
private static final int DEFAULT_TIMEOUT = 3000;
@Autowired
RocketMQTemplate rocketMQTemplate;
/**
* 发送异步消息(通过线程池执行发送到broker的消息任务, 执行完后回调)
*
* @param topic 话题
* @param tag
* @param msgBody 消息内容
*/
public void sendAsync(String topic, String tag, String msgBody) {
String destination = tag != null && !tag.isEmpty() ? topic + ":" + tag : topic;
rocketMQTemplate.asyncSend(destination, msgBody, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("sendAsync success, topic:{}, tag:{}, msg:{}", topic, tag, msgBody);
}
@Override
public void onException(Throwable throwable) {
log.error("sendAsync fail, topic:{}, tag:{}, msg:{}", topic, tag, msgBody, throwable);
}
});
}
}
@Component
@RocketMQMessageListener(topic = "screening-record-topic", consumerGroup = "screening-record-consumer_send-status-change", selectorExpression = "send-status-change")
public class ScreeningRecordSendStatusChangeConsumer implements RocketMQListener<ScreeningRecordSendStatusChangeMsg> {
private static final Logger log = LoggerFactory.getLogger(ScreeningRecordSendStatusChangeConsumer.class);
@Autowired
ScreeningRecordService screeningRecordService;
@Autowired
SmsService smsService;
@Override
public void onMessage(ScreeningRecordSendStatusChangeMsg message) {
log.info("接收到MQ消息, topic:screening-record-topic, tag:send-status-change, message:{}", message);
TScreeningRecord screeningRecord = screeningRecordService.findById(message.getRecordId());
if (screeningRecord != null && screeningRecord.getIsDelete().equals(YNEnum.N.getCode().byteValue())) {
smsService.sendNoticeMsg(message.getPhone(), message.getStudentName(), message.getStudentName());
screeningRecordService.updateSendStatus(YNEnum.Y.getCode(), message.getRecordId());
}
log.info("处理完成MQ消息, topic:screening-record-topic, tag:send-status-change, message:{}", message);
}
}
sms:
base-url: ${sms.baseurl}
uri: /sms-api/sms/send
token-uri: /sms-api/authen/token/create?userName={userName}
user: ******
sign: ***
verification:
template: SMS_464********
param: '{"args0":"%s","args1":"%s"}'
@Service
public class SmsService {
private static final Logger log = LoggerFactory.getLogger(SmsService.class);
WebClient webClient;
@Autowired
SmsProperties smsProperties;
@PostConstruct
public void init() {
webClient = WebClient.create(smsProperties.getBaseUrl());
}
/**
* 一键通知
*
* @param phone 手机号
* @param name 学生姓名
* @param student 学生姓名
*/
public void sendNoticeMsg(String phone, String name, String student) {
String param = String.format(smsProperties.getNotice().getParam(), name, student);
send(phone, smsProperties.getNotice().getTemplate(), param, smsProperties.getSign());
}
/**
* 发送短信
*
* @param mobile 手机号码:国内短信:+/+86/0086/86或无任何前缀的11位手机号码;国际/港澳台消息:国际区号+号码
* @param smsTemplate 短信模板
* @param smsParm 短信参数
* @param signName 签名
*/
private void send(String mobile, String smsTemplate, String smsParm, String signName) {
// 请求体
SmsRequestDTO smsRequestDTO = new SmsRequestDTO();
smsRequestDTO.setMobile(mobile);
smsRequestDTO.setSmsTemplete(smsTemplate);
smsRequestDTO.setSmsParm(smsParm);
smsRequestDTO.setSignName(signName);
Mono<SmsResponseDTO> mono = webClient
.post()
.uri(smsProperties.getUri())
.contentType(MediaType.APPLICATION_JSON)
.header(HttpHeaders.AUTHORIZATION, getSmsToken(smsProperties.getUser()))
.bodyValue(smsRequestDTO)
.retrieve()
.bodyToMono(SmsResponseDTO.class);
mono.subscribe(result -> log.info("发送短信完成, smsRequest:{}, smsResponse:{}", smsRequestDTO, result));
}
}