本文主要研究一下PowerJob的Alarmable
tech/powerjob/server/extension/Alarmable.java
public interface Alarmable {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList
public interface Alarm extends PowerSerializable {
String fetchTitle();
default String fetchContent() {
StringBuilder sb = new StringBuilder();
JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
content.forEach((key, originWord) -> {
sb.append(key).append(": ");
String word = String.valueOf(originWord);
if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
try {
if (originWord instanceof Long) {
word = CommonUtils.formatTime((Long) originWord);
}
}catch (Exception ignore) {
}
}
sb.append(word).append(OmsConstant.LINE_SEPARATOR);
});
return sb.toString();
}
}
Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm
tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java
@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
private final Environment environment;
private Long agentId;
private DingTalkUtils dingTalkUtils;
private Cache<String, String> mobile2UserIdCache;
private static final int CACHE_SIZE = 8192;
/**
* 防止缓存击穿
*/
private static final String EMPTY_TAG = "EMPTY";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (dingTalkUtils == null) {
return;
}
Set<String> userIds = Sets.newHashSet();
targetUserList.forEach(user -> {
String phone = user.getPhone();
if (StringUtils.isEmpty(phone)) {
return;
}
try {
String userId = mobile2UserIdCache.get(phone, () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(phone);
} catch (PowerJobException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;
}
});
if (!EMPTY_TAG.equals(userId)) {
userIds .add(userId);
}
}catch (Exception ignore) {
}
});
userIds.remove(null);
if (!userIds.isEmpty()) {
String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
try {
dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
}catch (Exception e) {
log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
}
}
}
@PostConstruct
public void init() {
String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
return;
}
if (!StringUtils.isNumeric(agentId)) {
log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
return;
}
this.agentId = Long.valueOf(agentId);
dingTalkUtils = new DingTalkUtils(appKey, appSecret);
mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
}
}
DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送
tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java
@Slf4j
@Service
public class MailAlarmService implements Alarmable {
@Resource
private Environment environment;
private JavaMailSender javaMailSender;
@Value("${spring.mail.username:''}")
private String from;
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
SimpleMailMessage sm = new SimpleMailMessage();
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());
javaMailSender.send(sm);
}catch (Exception e) {
log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
}
}
@Autowired(required = false)
public void setJavaMailSender(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
}
MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送
tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java
@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
private static final String HTTP_PROTOCOL_PREFIX = "http://";
private static final String HTTPS_PROTOCOL_PREFIX = "https://";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList)) {
return;
}
targetUserList.forEach(user -> {
String webHook = user.getWebHook();
if (StringUtils.isEmpty(webHook)) {
return;
}
// 自动添加协议头
if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
webHook = HTTP_PROTOCOL_PREFIX + webHook;
}
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
try {
String response = HttpUtils.post(webHook, requestBody);
log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
}catch (Exception e) {
log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
}
});
}
}
WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调
PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient
)、MailAlarmService(用的是spring的JavaMailSender
)、WebHookAlarmService(用的是okhttp3的OkHttpClient
)。