将正式环境数据库导入至灰度环境,并替换域名、数据库ip、密码
注意:脚本中需替换源数据库及目标数据库相关信息,备份sql目录,数据库库名,需替换的域名ip信息
1、基于python3.6
2、安装pymysql、os模块
# -*- coding: utf-8 -*-
# @Time : 2023/10/21 15:03
# @File : ExportImportDB.py
# @Description :
import pymysql
import logging
import os
import time
class ExportImportDB:
def __init__(self):
self.Tim = time.strftime('%Y%m%d', time.localtime(time.time()))
#备份目录(自定义)
self.SQLDir = '/webtv/mpctest/dbmysql/pre/'
self.LogFile = self.SQLDir + self.Tim + '.log'
# 本地保留2天
self.SaveDays = 2
# 指定源数据库ip
self.sourceDBHost = '10.101.89.52'
# 源数据库用户
self.sourceDBUser = 'mysql'
# 源数据库密码
self.sourceDBPassword = '******'
# 数据库端口号
self.sourceDBPort = 3306
# 导出的库(例:webtv_cms)
self.sourceDB = [
"webtv_cms",
]
# 不做字符串替换的库
# self.sourceIgnoreReplaceDB = ['appfac_appfac_b81ab1497ae1133dbc41e584912d77aa']
# 目标数据库信息
self.targetDBHost = '192.168.3.76'
self.targetDBUser = 'mysql'
self.targetDBPassword = '******'
self.targetDBPort = 3306
# 替换文本信息
# source 信息替换 targer
# 本环境为线上同步至灰度,所以需替换域名及ip密码等信息,如不需要替换可直接注释
self.replaceList = [
{'source': '.junhao.mil.cn', 'target': '-pre.junhao.mil.cn'},
{'source': 'JH-Mem129', 'target': 'JhPre-huidu2022'},
{'source': '10.101.89.86', 'target': '192.168.6.201'},
]
def Clean(self):
"""
:desc: 清理文件
:return:
"""
clean_res = os.system("find %s -mtime +%s -type d -exec rm -rf {} \;" % (self.SQLDir, self.SaveDays))
logging.error("clean files failed") if clean_res != 0 else logging.info("clean files success")
def testRun(self):
"""
:desc: 测试数据库链接情况
:return: {'status': 'success'}
"""
try:
ConnectSourceMysql = pymysql.connect(host=self.sourceDBHost, user=self.sourceDBUser,
passwd=self.sourceDBPassword, port=self.sourceDBPort)
ConnectSourceMysql.cursor()
ConnectTargetMysql = pymysql.connect(host=self.sourceDBHost, user=self.sourceDBUser,
passwd=self.sourceDBPassword, port=self.sourceDBPort)
ConnectTargetMysql.cursor()
return {'status': 'success'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def RunTask(self):
"""
:desc: 执行导出导入
:return:
"""
# 日志
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', filename=self.LogFile, filemode='a')
# 当前导出目录
exportDir = self.SQLDir + self.Tim
if not os.path.exists(exportDir):
os.makedirs(exportDir)
# 导出sql
for dbname in self.sourceDB :
result = os.system(
"mysqldump --set-gtid-purged=off --skip-lock-tables --ignore-table=webtv_cms.zdconfig -P %s -u%s -p'%s' -h %s %s > %s/%s.sql" % (
self.sourceDBPort, self.sourceDBUser, self.sourceDBPassword, self.sourceDBHost, dbname, exportDir,
dbname))
os.system("sed -i '1iuse %s' %s/%s.sql" % (dbname, exportDir, dbname))
if result != 0:
print('dump %s failed !! ' % (dbname))
logging.error('dump %s failed !! ' % (dbname))
else:
print('dump db %s success' % (dbname))
logging.info('dump db %s success' % (dbname))
# 替换数据
for replace in self.replaceList:
for dbname in self.sourceDB:
result = os.system(
"sed -i 's/%s/%s/g' %s/%s.sql" % (replace['source'], replace['target'], exportDir, dbname)
)
print("sed -i 's/%s/%s/g' %s/%s.sql" % (replace['source'], replace['target'], exportDir, dbname))
if result != 0:
logging.error('replace db %s source string %s failed !! ' % (dbname, replace['source']))
else:
logging.info("replace db %s source string %s success" % (dbname, replace['source']))
# # 导入数据
for dbname in self.sourceDB :
result = os.system(
"mysql -P %s -u%s -p'%s' -h %s %s < %s/%s.sql" % (
self.targetDBPort, self.targetDBUser, self.targetDBPassword, self.targetDBHost, dbname, exportDir,
dbname)
)
if result != 0:
logging.error('import %s failed !! ' % (dbname))
else:
logging.info("import db %s success" % (dbname))
if __name__ == '__main__':
run = ExportImportDB()
testResult = run.testRun()
if testResult['status'] == 'success':
print('mysql connected success')
run.RunTask()
else:
print(run['message'])
exit(1)
# 执行脚本命令
python3.6 ExportImportDB.py
# 后台执行
nohup python3.6 ExportImportDB.py &