芯片设计环境通常有比较严格的安全管理,用户帐号过期后就不能登录环境,影响用户工作。为减少影响,应该提前向用户发送提醒,及时更新密码。
通过IDM REST API接口获取所有帐号信息,根据以下条件过滤,筛选出即将过期帐号和已过期帐号。
帐号类型为正式员工(employee)和实习生(intern)
帐号没有在需要忽略的清单中
帐号过期日期相对于今天的间隔时间在指定间隔时间内
筛选出需要通知的帐号列表以后,遍历列表,如果帐号与公司人事系统不一致,使用公司人事系统帐号,最后记录日志并发送飞书提醒给用户。
飞书通知示例如下:
示例代码:
#!/opt/miniconda3/bin/python
##########################################
## Author: shuguangbo
##########################################
import os
import re
import datetime
import sys
import traceback
import time
import yaml
import logging
import logging.config
import re
import urllib3
import requests
import json
import ipahttp
urllib3.disable_warnings()
def parseConfig(cfname=None):
appConf = None
try:
if cfname is None or not os.path.exists(cfname) :
cfname = os.path.join(os.environ['CONF_DIR'] if 'CONF_DIR' in os.environ else os.path.dirname(__file__), os.path.basename(__file__).split('.')[0] + ".yml")
with open(cfname, 'r') as fd :
appConf = yaml.load(fd, Loader=yaml.FullLoader)
except Exception as e:
logging.error("Read configuration failed! Error:{}".format(str(e)))
sys.exit(1)
reqParams = {'IDM':['IDM_URL', 'IDM_USER', 'IDM_PASS'],
'NOTICE': ['ADMINS', 'NOTICE_DATES', 'TMPDIR', 'TOKEN', 'LARKURL'],
'LOGGERCONFIG':['formatters', 'handlers', 'root']}
missingParams = {key: set(reqParams[key]) - set(appConf[key].keys()) for key in reqParams.keys() if len(set(reqParams[key]) - set(appConf[key].keys()))}
if len(missingParams):
logging.error('Missing parameters: {}'.format(missingParams))
sys.exit(1)
appConf['IDM']['IDM_PASS'] = appConf['IDM']['IDM_PASS']
if len(appConf['NOTICE']['NOTICE_DATES']) < 1:
logging.error('Empty notice dates!')
sys.exit(4)
if 'NOTIFY_ADMIN_ONLY' not in appConf['NOTICE']:
appConf['NOTICE']['NOTIFY_ADMIN_ONLY'] = False
return appConf
class accountExpiryChecker():
def __init__(self, config):
self._config = config
self._idmClient = self._createIDMClient()
self._expiryList = []
def _createIDMClient(self):
idm = None
connected = False
try:
admin_receivers = ' '.join(self._config['NOTICE']['ADMINS'])
user = self._config['IDM']['IDM_USER']
password = self._config['IDM']['IDM_PASS']
urlList = self._config['IDM']['IDM_URL'] if isinstance(self._config['IDM']['IDM_URL'], list) else [self._config['IDM']['IDM_URL']]
for url in urlList:
try:
idm = ipahttp.ipa(url)
ret = idm.login(user, password)
if ret.status_code != 200:
logging.error("Login IDM %s failed, error:%s."%(url, ret.text))
time.sleep(5)
continue
else:
connected = True
break
except Exception as e:
logging.error("Login IDM {} failed. Error: {}, Stack: {}".format(url, str(e), traceback.format_exc()))
continue
if connected:
logging.info("Login IDM succeeded.")
else:
logging.error("Failed to loging IDM. Exit!")
exit(2)
except Exception as e:
logging.error("Login IDM failed. Error: {}".format(str(e), traceback.format_exc()))
exit(2)
return idm
def _checkExpiry(self):
ipa = self._idmClient
users = ipa.user_find()
expiryList = []
maxExpiryDay = max([abs(item) for item in self._config['NOTICE']['NOTICE_DATES']])
logging.info('Total {} accounts. Max expiry notice day is {}'.format(len(users['result']['result']), maxExpiryDay))
for user in users['result']['result']:
name = user['uid'][0]
cname = user['displayname'][0] if 'displayname' in user else '-'
atype = user['employeetype'][0] if 'employeetype' in user else '-'
ts = user['krbpasswordexpiration'][0] if 'krbpasswordexpiration' in user else '-'
status = user['nsaccountlock']
if cname == '-' or atype == '-' or ts == '-' or status:
continue
if atype not in ['employee', 'intern', 'vendor']:
continue
if name in self._config['NOTICE']['IGNORED_ACCOUNTS']:
continue
expiry = datetime.datetime.strptime(ts[0:8], '%Y%m%d').date()
delta = (expiry - datetime.date.today()).days
if delta > maxExpiryDay:
continue
data = {'cname':cname, 'name':name, 'expiry':expiry.strftime('%Y-%m-%d'), 'delta': delta}
expiryList.append(data)
logging.debug('{}'.format(data))
self._expiryList = expiryList
def _notifyExpiry(self):
logging.info(f"Notify admin only {self._config['NOTICE']['NOTIFY_ADMIN_ONLY']}")
gqtx = '用户 {} IC环境帐号 {} 密码有效期至 {},将在 {} 天后过期。请及时更新密码。'
gqtz = '用户 {} IC环境帐号 {} 密码有效期至 {},已过期 {} 天。请及时更新密码。'
gqdt = '用户 {} IC环境帐号 {} 密码有效期至 {},在今天过期。请及时更新密码。'
msg = ''
for expiry in self._expiryList:
delta = expiry['delta']
if delta in self._config['NOTICE']['NOTICE_DATES']:
user = self._config['NOTICE']['SPECIAL_ACCOUNTS'].get(expiry['name'])
user = user if user else expiry['name']
receiver = ' '.join(self._config['NOTICE']['ADMINS'])
if not self._config['NOTICE']['NOTIFY_ADMIN_ONLY']:
receiver += ' ' + user
if expiry['delta'] > 0:
msg = gqtx.format(expiry['cname'], expiry['name'], expiry['expiry'], expiry['delta'])
reminder = f"将在 {expiry['delta']} 天后过期"
elif expiry['delta'] == 0:
msg = gqdt.format(expiry['cname'], expiry['name'], expiry['expiry'])
reminder = "在今天过期"
else:
msg = gqtz.format(expiry['cname'], expiry['name'], expiry['expiry'], abs(expiry['delta']))
reminder = f"已过期 {abs(expiry['delta'])} 天"
logger.info(msg)
data = {'user':expiry['cname'], 'account':expiry['name'], 'expiry':expiry['expiry'], 'reminder':reminder}
if len(receiver):
self._sendLark(data, receiver)
def _sendLark(self, message, receivers):
try:
session = requests.Session()
larkURL = self._config['NOTICE']['LARKURL']
header = {'Content-Type':'application/json', 'Authorization':self._config['NOTICE']['TOKEN']}
data = {}
data['trigger_key'] = 'IC环境IDM帐号密码过期提醒'
data['instance'] = message
data['notice'] = receivers if type(receivers) == list else receivers.split()
result = session.post(larkURL, headers=header, data=json.dumps(data), verify=False)
if result.status_code == 200:
logging.info('Send message succeeded.')
else:
logging.error(f"Send message failed. Error: {result.text}")
except Exception as e:
logging.error(f"Send message failed. Error: {str(e)}, Stack: {traceback.format_exc()}")
def run(self):
self._checkExpiry()
self._notifyExpiry()
if __name__ == "__main__":
config = parseConfig()
logging.config.dictConfig(config['LOGGERCONFIG'])
logger = logging.getLogger(os.path.basename(__file__))
exipryChecker = accountExpiryChecker(config)
exipryChecker.run()
通过设置配置文件中的参数 NOTICE_DATES 控制发送通知的频率,下面的配置为提前15、7、4、2、1天和当天,以及过期后1、2、4、7、14、21天后发送通知。
NOTICE_DATES: [15, 7, 4, 2, 1, 0, -1, -2, -4, -7, -14, -21, -28, -35, -42, -49, -56, -63, -70, -90, -120, -150, -180, -210]
可能通过 crontab 或 Jenkins 每天运行一次即可。?