Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。
Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本?
获取所有application
curl -s http://XXX:8088/ws/v1/cluster/apps
获取 state值为 RUNNING 的application任务
curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING?
获取这个任务单个信息?
curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state
jq,是linux一个很方便的json处理工具
通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。
安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)
yum install jq
编写shell脚本
由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。
#!/bin/bash
Joblist=`cat /opt/shell/logs/flink_job.log` #获取记录job的log文件
let i=0 #获取任务数
let log_count=0 #获取日志中的任务数
start_count=RUNNING #判断任务是否存在异常
############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
if [ -z "$Joblist" ]
then
while :
do
job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
else
echo ${job_id[$i]}
echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
let i++
fi
done
fi
############## 2 读取文件中JOB任务 ##################
let i=0
while read line
do
JOB[$i]=$line
let i++
done</opt/shell/logs/flink_job.log
log_count=$i #获取日志中的任务数
########### 3 判断任务状态,是否为RUNNIG,不是则邮件告警 ###############
for ((j=0;j<i;j++))
do
JOB_ID=${JOB[$j]//\"}
JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.state`
JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.name`
START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.startedTime` / 1000]
# echo "JOB_NAME: "$JOB_NAME
# echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
# echo "JOB_status: " ${JOB_status//\"}
#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
#echo "=============================================="
if [ ${JOB_status//\"} != "RUNNING" ];then
SUBJECT="【异常告警】Flink任务异常"
TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n\n目前状态: $JOB_status"
echo -e $TEXT | mail -s $SUBJECT 邮箱地址
start_count=erron
fi
done
########### 4 出现任务异常,重新读取job 任务记录到日志文件 ###############
let i=0
if [ $start_count == "erron" ];then
echo '重新写入日志文件'
while :
do
job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
elif [ $i == 0 ]; then
echo ${job_id[$i]}>/opt/shell/logs/flink_job.log
else
echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
fi
let i++
done
start_count=RUNNING
fi
########### 5 判断线上任务数是否一致,是否有新任务增加 ###############
let i=0
while :
do
job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
else
let i++
fi
done
let count=$i #线上任务数
echo "==========================线上最新RUNNING状态任务数: "$count
echo "==========================日志RUNNING状态任务数: "$log_count
if [ ! $count -eq $log_count ]; then
echo "现有RUNNING状态任务数不相等于已记录的任务数"
echo ${job_id[0]} >/opt/shell/logs/flink_job.log
for ((i=1;i<count;i++))
do
echo "重新写入JOB: "${job_id[$i]}
echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log
done
fi
echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
echo ================================================================================================
echo =====================================本次crontab监控结束========================================
echo ================================================================================================