在实际工作中,LSF计算节点有可能因为硬件故障导致运行在上面的作业状态变为未知。此时需要通知作业的主人终止作业,并重新提交作业。同时为了降低工作量,需要能够自动发送通知。
我们可以通过 LSF Python API 接口收集状态为未知的作业,获取作业的属主、执行主机名称、队列等信息,根据这些信息生成通知,通过邮件、飞书或其它方式发送给用户。
通知示例如下:
下面是通过 LSF API 获取作业状态并通过飞书发送通知给用户的示例代码。
#!/opt/miniconda3/bin/python
import os
import re
import datetime
import shutil
import sys
import stat
import traceback
import time
import yaml
import logging
import logging.config
import re
import urllib3
import requests
import json
import string
import tempfile
import errno
import urllib3
urllib3.disable_warnings()
def parseConfig(cfname=None):
appConf = None
LSF = True
try:
if cfname is None or not os.path.exists(cfname) :
cfname = os.path.join(os.path.dirname(__file__), os.path.basename(__file__).split('.')[0] + ".yml")
with open(cfname, 'r') as fd :
appConf = yaml.load(fd, Loader=yaml.FullLoader)
except Exception as e:
logging.error("Read configuration failed! Error:{}".format(str(e)))
sys.exit(1)
reqParams = {'NOTICE':['NOTIFY_ADMINS', 'LARK', 'TMPDIR'],
'LOGGERCONFIG':['formatters', 'handlers', 'root']}
missingParams = {key: set(reqParams[key]) - set(appConf[key].keys()) for key in reqParams.keys() if len(set(reqParams[key]) - set(appConf[key].keys()))}
if len(missingParams):
logging.error('Missing parameters: {}'.format(missingParams))
sys.exit(1)
if 'LSF_ENVDIR' not in os.environ or 'LSF_ENVDIR' not in appConf['LSF']:
sys.exit('Missing LSF_ENVDIR, exit...')
if not os.access(appConf['NOTICE']['LARK'], os.X_OK):
logging.error('Can\'t execute {}, please check.'.format(appConf['NOTICE']['LARK']))
sys.exit(2)
if 'NOTIFY_ADMIN_ONLY' not in appConf['NOTICE']:
appConf['NOTICE']['NOTIFY_ADMIN_ONLY'] = False
return appConf
class lsfUnknownJobChecker():
def __init__(self, config):
self._config = config
self._unknownJobs = []
if lsf.lsb_init('lsf') > 0:
msg = 'Failed to connect LSF, please check LSF health.'
logging.error(msg)
sys.exit(msg)
def run(self):
self._checkUnknownJob()
self._notify()
# Shall run on LSF node
def _checkUnknownJob(self):
jobs = []
ln = lsf.new_intp()
lsf.intp_assign(ln, 0)
jnum = lsf.lsb_openjobinfo(0, '', 'all', '', '', 0x0010)
if jnum > 0:
for i in range(0,jnum):
jr = lsf.lsb_readjobinfo(ln)
if jr and jr.status & 0x10000:
jobIndex = jr.jobId >> 32
jobId = jr.jobId & 0xFFFFFFFF
jobs.append({
"user": jr.user,
"jobId": '{}'.format(jobId) if jobIndex == 0 else '{}[{}]'.format(jobId, jobIndex),
"queue": jr.submit.queue,
"executeHost": self._exHosts(jr.exHosts, jr.numExHosts),
"startTime": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(jr.startTime)),
"command": jr.submit.command
})
lsf.lsb_closejobinfo()
self._unknownJobs = jobs
def _exHosts(self, hostList, numHost):
hList = list(set(lsf.string_array_to_pylist(hostList, numHost)))
return ' '.join(hList)
def _notify(self):
if len(self._unknownJobs):
for job in self._unknownJobs:
msg = f"由于计算节点 {job['executeHost']} 发生硬件故障,需要下线维修。很抱歉,受此影响用户帐号 {job['user']} 的任务 {job['jobId']} 状态异常。建议你终止任务,并重新提交。\n"
msg += f"接收队列:{job['queue']}\n"
msg += f"执行主机:{job['executeHost']}\n"
msg += f"开始时间:{job['startTime']}\n"
msg += f"任务命令:{job['command']}\n\n"
msg += f"任务终止方法:在终端中运行命令 bkill {job['jobId']}"
logging.info(f"{msg}")
user = job['user']
corp_user = self._config['NOTICE']['SPECIAL_ACCOUNTS'].get(user)
user = corp_user if corp_user else user
receiver = ' '.join(self._config['NOTICE']['NOTIFY_ADMINS'])
receiver += '' if self._config['NOTICE']['NOTIFY_ADMIN_ONLY'] else ' ' + user
if len(receiver):
self._sendLark(msg, receiver)
else:
logging.info(f"未发现状态未知的任务。")
def _sendLark(self, message, receivers):
try:
session = requests.Session()
larkURL = self._config['NOTICE']['LARKURL']
header = {'Content-Type':'application/json', 'Authorization':self._config['NOTICE']['TOKEN']}
data = {}
data['trigger_key'] = 'LSF任务状态异常提醒'
data['instance'] = {'content':message}
data['notice'] = receivers if type(receivers) == list else receivers.split()
result = session.post(larkURL, headers=header, data=json.dumps(data), verify=False)
if result.status_code == 200:
logging.info('Send message succeeded.')
else:
logging.error(f"Send message failed. Error: {result.text}")
except Exception as e:
logging.error(f"Send message failed. Error: {str(e)}, Stack: {traceback.format_exc()}")
if __name__ == "__main__":
config = parseConfig()
if 'openlava' in os.environ['LSF_ENVDIR']:
LSF = False
from pythonOpenlava import lsf
else:
from pythonlsf import lsf
logging.config.dictConfig(config['LOGGERCONFIG'])
logger = logging.getLogger(os.path.basename(__file__))
lsfChecker = lsfUnknownJobChecker(config)
lsfChecker.run()
如果主机上的 SBD 服务终止也可导致作业状态变为 unknown,但可通过重启 SBD 服务恢复。为避免在此场景下发送此通知,所以建议在确认硬件故障导致作业状态未知时通过手工运行此脚本来发送通知。?
?