LSF unknown 状态任务提醒

发布时间:2024年01月02日

在实际工作中,LSF计算节点有可能因为硬件故障导致运行在上面的作业状态变为未知。此时需要通知作业的主人终止作业,并重新提交作业。同时为了降低工作量,需要能够自动发送通知。

我们可以通过 LSF Python API 接口收集状态为未知的作业,获取作业的属主、执行主机名称、队列等信息,根据这些信息生成通知,通过邮件、飞书或其它方式发送给用户。

通知示例如下:

下面是通过 LSF API 获取作业状态并通过飞书发送通知给用户的示例代码。

#!/opt/miniconda3/bin/python
import os
import re
import datetime
import shutil
import sys
import stat
import traceback
import time
import yaml
import logging
import logging.config
import re
import urllib3
import requests
import json
import string
import tempfile
import errno
import urllib3

urllib3.disable_warnings()

def parseConfig(cfname=None):
    appConf = None
    LSF = True

    try:
        if cfname is None or not os.path.exists(cfname) :
            cfname = os.path.join(os.path.dirname(__file__), os.path.basename(__file__).split('.')[0] + ".yml")
        with open(cfname, 'r') as fd :
            appConf = yaml.load(fd, Loader=yaml.FullLoader)
    except Exception as e:
        logging.error("Read configuration failed! Error:{}".format(str(e)))
        sys.exit(1)
    
    reqParams = {'NOTICE':['NOTIFY_ADMINS', 'LARK', 'TMPDIR'],
                 'LOGGERCONFIG':['formatters', 'handlers', 'root']}

    missingParams = {key: set(reqParams[key]) - set(appConf[key].keys()) for key in reqParams.keys() if len(set(reqParams[key]) - set(appConf[key].keys()))}
    
    if len(missingParams):
        logging.error('Missing parameters: {}'.format(missingParams))
        sys.exit(1)

    if 'LSF_ENVDIR' not in os.environ or 'LSF_ENVDIR' not in appConf['LSF']:
        sys.exit('Missing LSF_ENVDIR, exit...')


    if not os.access(appConf['NOTICE']['LARK'], os.X_OK):
        logging.error('Can\'t execute {}, please check.'.format(appConf['NOTICE']['LARK']))
        sys.exit(2)

    if 'NOTIFY_ADMIN_ONLY' not in appConf['NOTICE']:
        appConf['NOTICE']['NOTIFY_ADMIN_ONLY'] = False

    return appConf

class lsfUnknownJobChecker():
    def __init__(self, config):
        self._config = config
        self._unknownJobs = []
        if lsf.lsb_init('lsf') > 0:
            msg = 'Failed to connect LSF, please check LSF health.'
            logging.error(msg)
            sys.exit(msg) 

    def run(self):
        self._checkUnknownJob()
        self._notify()  

    # Shall run on LSF node
    def _checkUnknownJob(self):
        jobs = []
        ln = lsf.new_intp()
        lsf.intp_assign(ln, 0)
        jnum = lsf.lsb_openjobinfo(0, '', 'all', '', '', 0x0010)
        if jnum > 0:
            for i in range(0,jnum):
                jr = lsf.lsb_readjobinfo(ln)
                if jr and jr.status & 0x10000:                    
                    jobIndex = jr.jobId >> 32
                    jobId = jr.jobId & 0xFFFFFFFF
                    jobs.append({
                        "user": jr.user,
                        "jobId": '{}'.format(jobId) if jobIndex == 0 else '{}[{}]'.format(jobId, jobIndex),
                        "queue": jr.submit.queue,
                        "executeHost": self._exHosts(jr.exHosts, jr.numExHosts),
                        "startTime": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(jr.startTime)),
                        "command": jr.submit.command
                    })
        lsf.lsb_closejobinfo()
        self._unknownJobs = jobs
    
    def _exHosts(self, hostList, numHost):
        hList = list(set(lsf.string_array_to_pylist(hostList, numHost)))
        return ' '.join(hList)

    def _notify(self):
        if len(self._unknownJobs):
            for job in self._unknownJobs:
                msg = f"由于计算节点 {job['executeHost']} 发生硬件故障,需要下线维修。很抱歉,受此影响用户帐号 {job['user']} 的任务 {job['jobId']} 状态异常。建议你终止任务,并重新提交。\n"
                msg += f"接收队列:{job['queue']}\n"
                msg += f"执行主机:{job['executeHost']}\n"
                msg += f"开始时间:{job['startTime']}\n"
                msg += f"任务命令:{job['command']}\n\n"
                msg += f"任务终止方法:在终端中运行命令 bkill {job['jobId']}"
                logging.info(f"{msg}")
                user = job['user']
                corp_user =  self._config['NOTICE']['SPECIAL_ACCOUNTS'].get(user)
                user = corp_user if corp_user else user
                receiver = ' '.join(self._config['NOTICE']['NOTIFY_ADMINS']) 
                receiver += '' if self._config['NOTICE']['NOTIFY_ADMIN_ONLY'] else '  ' + user
                if len(receiver):
                    self._sendLark(msg, receiver)
        else:
            logging.info(f"未发现状态未知的任务。")

    def _sendLark(self, message, receivers):
        try:
            session = requests.Session()
            larkURL = self._config['NOTICE']['LARKURL']
            header = {'Content-Type':'application/json', 'Authorization':self._config['NOTICE']['TOKEN']}
            data = {}
            data['trigger_key'] = 'LSF任务状态异常提醒'
            data['instance'] = {'content':message}
            data['notice'] = receivers if type(receivers) == list else receivers.split()
            result = session.post(larkURL, headers=header, data=json.dumps(data), verify=False)
            if result.status_code == 200:
                logging.info('Send message succeeded.')
            else:
                logging.error(f"Send message failed. Error: {result.text}")
        except Exception as e:
            logging.error(f"Send message failed. Error: {str(e)}, Stack: {traceback.format_exc()}")
    
if __name__ == "__main__":
    config = parseConfig()
    if 'openlava' in os.environ['LSF_ENVDIR']:
        LSF = False
        from pythonOpenlava import lsf
    else:
        from pythonlsf import lsf    
    logging.config.dictConfig(config['LOGGERCONFIG'])
    logger = logging.getLogger(os.path.basename(__file__))
    lsfChecker = lsfUnknownJobChecker(config)
    lsfChecker.run()

如果主机上的 SBD 服务终止也可导致作业状态变为 unknown,但可通过重启 SBD 服务恢复。为避免在此场景下发送此通知,所以建议在确认硬件故障导致作业状态未知时通过手工运行此脚本来发送通知。?

?

文章来源:https://blog.csdn.net/weixin_71448448/article/details/135254172
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。