本程序为Scala通过http获取yarn任务列表,判断任务执行时长,并将超时任务写入MySQL,且发送邮件提醒。
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
/**
* http获取yarn任务列表
*/
object HttpGetResponse {
def getResponse(url: String, header: String = null): String = {
val httpClient: CloseableHttpClient = HttpClients.createDefault() // 创建 client 实例
val get = new HttpGet(url) // 创建 get 实例
if (header != null) { // 设置 header
val json: JSONObject = JSON.parseObject(header)
json.keySet().toArray.map((_: AnyRef).toString).foreach((key: String) => get.setHeader(key, json.getString(key)))
}
val response: CloseableHttpResponse = httpClient.execute(get) // 发送请求
EntityUtils.toString(response.getEntity) // 获取返回结果
}
}
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import scala.collection.mutable.ListBuffer
/**
* 解析MR任务的json
*/
object analysisTaskUtil {
def analysisJson(jsonStr: String): List[List[String]] = {
//创建结果集
var resultList: ListBuffer[List[String]] = scala.collection.mutable.ListBuffer[List[String]]()
//resultList += "aa"
//解析返回json
val jsonOBJ: JSONObject = JSON.parseObject(jsonStr)
val apps: String = jsonOBJ.getString("apps")
val appsJson: JSONObject = JSON.parseObject(apps)
val appJsonArray: JSONArray = appsJson.getJSONArray("app")
//获取任务个数
if (appJsonArray != null) {
//遍历每个任务返回获取值
for (index <- appJsonArray.toArray().indices) {
val result: JSONObject = appJsonArray.getJSONObject(index)
val id: String = result.getString("id")
val user: String = result.getString("user")
val queue: String = result.getString("queue")
val state: String = result.getString("state")
val startedTime: String = result.getString("startedTime")
val name: String = result.getString("name")
val ApplicationType: String = result.getString("applicationType")
resultList += List(id, user, queue, state, startedTime,name,ApplicationType)
}
}
resultList.toList
}
import java.io.{BufferedInputStream, FileInputStream}
import java.util.Properties
/**
* 读取配置信息
*/
object propertiesUtil {
//读取配置文件
def getProperties(fileName: String): Properties = {
val prop = new Properties()
val inputStream = new BufferedInputStream(new FileInputStream(fileName))
prop.load(inputStream)
prop
}
}
import java.text.{ParseException, SimpleDateFormat}
import java.util.Date
/**
* 时间处理类
*/
object timeUtil {
val df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def tranTimeToString(tm: String): String = {
val tim: String = df.format(new Date(tm.toLong))
tim
}
def NowDate(): String = {
val now: Date = new Date()
val date: String = df.format(now)
date
}
def timediff(startTime: String, endTime: String): Long = {
var min: Long = 0
try {
val startDate: Date = df.parse(startTime)
val endDate: Date = df.parse(endTime)
val diff: Long = endDate.getTime - startDate.getTime
if (diff < 0) sys.error("输入时间错误")
min = diff / 1000 / 60
} catch {
case e: ParseException => e.printStackTrace()
}
min
}
}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.Logger
import play.api.libs.mailer._
import java.io.{FileReader, InputStreamReader}
/**
* 发送email
*/
object EmailUtil {
def sendEmail(ehost:String,
eport:Int,
euser:String ,
epassword:String,
subject: String,
content: List[List[String]],
to: Seq[String]): Unit = {
val logger: Logger = Logger.getLogger(EmailUtil.getClass)
val from: String = euser //填写邮件发送地址
val subject_pre = " " //邮件主题告警前缀
var emailText = ""
content.foreach{
task=>
// emailText += s"""|<p align="left">${con}</p> \n"""
emailText += s"""|<tr>
|<td>${task.head}</td>
|<td>${task(1)}</td>
|<td>${task(2)}</td>
|<td>${task(3)}</td>
|<td>${task(4)}</td>
|<td>${task(5)}</td>
|<td>${task(6)}</td>
|</tr> \n"""
}
val bodyHtml: Option[String] = Option(
s"""
|<html>
|<body>
|<h1 align="center">${subject_pre} ${subject}</h1>
|<br />
|<h3 align="left">超时任务:</h3>
|<table border="1px red solid" cellspacing="0">
|<tr>
|<th>application_id</th>
|<th>queue</th>
|<th>user</th>
|<th>applicationtype</th>
|<th>started_time</th>
|<th>duration</th>
|<th>name</th>
|</tr>
${emailText}
|</table>
|<br />
|<br />
|<br />
|<h3 align="center">请及时处理</h3>
|</body>
|</html>
""".stripMargin)
val charset: Option[String] = Option("utf-8") // 字符编码 默认utf-8
// 生成邮件
val email: Email = Email(subject, from, to, None, bodyHtml, charset)
// STMP服务参数
val host: String = ehost
// STMP服务端口号
val port: Int = eport
// STMP服务发送者用户邮箱
val user: Option[String] = Option(euser)
// 在126网站中开通smtp服务,会返回一个密码
// 这个密码很重要,填错了发不了邮件
val password: Option[String] = Option(epassword)
val timeout: Option[Int] = Option(10000)
//setSocketTimeout 默认60s
val connectionTimeout: Option[Int] = Option(10000)
//setSocketConnectionTimeout 默认60s
// STMP服务SMTPConfiguration
val configuration: SMTPConfiguration = new SMTPConfiguration(
host, port, false, false, false,
user, password, false, timeout,
connectionTimeout, ConfigFactory.empty(), false)
val mailer: SMTPMailer = new SMTPMailer(configuration)
// 发送邮件
mailer.send(email)
logger.info("==========邮件已发送成功!!!==========")
}
import com.sinosig.tianshu.utils.{EmailUtil, HttpGetResponse, analysisTaskUtil, propertiesUtil, timeUtil}
import org.apache.log4j.Logger
import java.util.Properties
import scala.collection.mutable.ListBuffer
import java.sql.{Connection, DriverManager}
/**
* 定时监控长任务,任务超时邮件告警
*/
object LongTaskMonitoring {
val logger: Logger = Logger.getLogger(LongTaskMonitoring.getClass)
def main(args: Array[String]): Unit = {
if (args.length != 1) {
sys.error("缺少文件路径")
}
val filePath: String = args(0)
val prop: Properties = propertiesUtil.getProperties(filePath)
val yarnHost: String = prop.getProperty("yarnHost")
val emailHost: String = prop.getProperty("emailHost")
val emailPort: Int = prop.getProperty("emailPort").toInt
val emailUser: String = prop.getProperty("emailUser")
val emailPassword: String = prop.getProperty("emailPassword")
val taskTime: Int = prop.getProperty("taskTime").toInt
val emailTaskTime: Int = prop.getProperty("emailTaskTime").toInt
val dbdriver: String = prop.getProperty("db.driver")
val dburl: String = prop.getProperty("db.url")
val dbuser: String = prop.getProperty("db.user")
val dbpassword: String = prop.getProperty("db.password")
val ApplicationType: String = prop.getProperty("ApplicationType")
// val dburl: String = args(1)
// val dbuser: String = args(2)
// val dbpassword: String = args(3)
logger.info("==========配置获取完成==========")
Class.forName(dbdriver)
val connection = DriverManager.getConnection(dburl, dbuser, dbpassword)
val statement = connection.createStatement
val warnLongTask_email: ListBuffer[List[String]] = scala.collection.mutable.ListBuffer[List[String]]()
val warnLongTask_db: ListBuffer[List[String]] = scala.collection.mutable.ListBuffer[List[String]]()
//遍历队列,分别获取任务列表
val yarnQueues = statement.executeQuery("select distinct queue from long_task_minitor_properties")
while (yarnQueues.next()) {
val queue = yarnQueues.getString("queue")
val yarnHttpApi = s"http://$yarnHost:8088/ws/v1/cluster/apps?queue=$queue"
val getResult = HttpGetResponse.getResponse(yarnHttpApi)
//解析返回json
val taskList: List[List[String]] = analysisTaskUtil.analysisJson(getResult)
//遍历结果集,判断任务是否超时
taskList.foreach {
task =>
val taskTimeDiff = timeUtil.timediff(timeUtil.tranTimeToString(task(4)), timeUtil.NowDate())
if (task(6).toLowerCase.contains(ApplicationType)) {
}
else {
if (taskTimeDiff > taskTime) {
warnLongTask_db += task
if (taskTimeDiff > emailTaskTime) {
warnLongTask_email += task
}
}
}
}
logger.info(s"""==========${yarnQueues}队列任务列表获取完成==========""")
}
//超时任务写入db
val applications = statement.executeQuery("select application_id from long_task_minitor_log where update_time > DATE_SUB(now(), INTERVAL 2 DAY) ")
var applicationLis = ListBuffer[String]()
while (applications.next()) {
applicationLis += applications.getString("application_id")
}
if (warnLongTask_db.nonEmpty) {
val groups = statement.executeQuery("select queue,user,group_name from long_task_minitor_properties")
val users_email = ListBuffer[(String, String, String)]()
while (groups.next()) {
val db_queue = groups.getString("queue")
val db_user = groups.getString("user")
val group = groups.getString("group_name")
users_email += ((db_queue, db_user, group))
}
// 对每个用户发送邮件
users_email.foreach { case (db_queue, db_user, group) =>
warnLongTask_db.foreach { task =>
if (db_queue == task(2) && db_user == task(1)) {
val group_name = group
val duration = Math.round(timeUtil.timediff(timeUtil.tranTimeToString(task(4)), timeUtil.NowDate()))
val insert_sql = s"('${task.head}','${task(2)}','${task(1)}','$group_name','${task(6)}','${timeUtil.tranTimeToString(task(4))}','$duration','${task(5)}','N')"
val oql = s"replace into long_task_minitor_log (application_id,queue,user,group_name,applicationtype,started_time,duration,name,end_flag) values $insert_sql"
if (applicationLis.contains(task.head)) {
statement.executeUpdate(oql)
logger.info(s"==========${task.head}近期未结束任务再次写入数据库成功============")
var currentApllicList = List(task.head)
applicationLis --= currentApllicList
} else {
statement.executeUpdate(oql)
logger.info(s"==========${task.head}新增任务插入数据库成功============")
}
}
}
}
}
applicationLis.foreach {
task =>
val sql = s"UPDATE long_task_minitor_log SET end_flag='Y' WHERE application_id='$task' "
statement.executeUpdate(sql)
}
//超时任务告警
if (warnLongTask_email.nonEmpty) {
val subject = "任务超时告警"
// 查询用户邮箱信息
val rs = statement.executeQuery("select queue,user,email from long_task_minitor_properties")
val users_email = ListBuffer[(String, String, String)]()
while (rs.next()) {
val queue = rs.getString("queue")
val user = rs.getString("user")
val email = rs.getString("email")
users_email += ((queue, user, email))
}
// 对每个用户发送邮件
users_email.foreach { case (queue, user, email) =>
val content: ListBuffer[List[String]] = scala.collection.mutable.ListBuffer[List[String]]()
// 获取对应用户的报错信息
val userTasks = warnLongTask_email.filter(task => queue == task(2) && user == task(1))
userTasks.foreach {
task =>
val duration = timeUtil.timediff(timeUtil.tranTimeToString(task(4)), timeUtil.NowDate())
val info = s"超时任务用户:${task(1)},超时任务ID:${task.head}"
logger.info(info)
val lst = s"${task.head},${task(2)},${task(1)},${task(6)},${timeUtil.tranTimeToString(task(4))},$duration,${task(5)}".split(",").toList
content += lst
}
// 如果有报错信息,则发送邮件
if (content.nonEmpty) {
if (email.contains(";")) {
val to = email.split(";").toSeq
EmailUtil.sendEmail(emailHost, emailPort, emailUser, emailPassword, subject, content.toList, to)
} else {
val to = Seq(email)
EmailUtil.sendEmail(emailHost, emailPort, emailUser, emailPassword, subject, content.toList, to)
}
}
}
}
logger.info("==========本次检测完成!!!==========")
statement.close()
}
}