一键迁移数据库python脚本

发布时间:2024年01月18日

目的

将正式环境数据库导入至灰度环境,并替换域名、数据库ip、密码

注意:脚本中需替换源数据库目标数据库相关信息,备份sql目录数据库库名需替换的域名ip信息

前提条件

1、基于python3.6

2、安装pymysql、os模块

# -*- coding: utf-8 -*-
# @Time    : 2023/10/21 15:03
# @File    : ExportImportDB.py
# @Description :

import pymysql
import logging
import os
import time


class ExportImportDB:

    def __init__(self):

        self.Tim = time.strftime('%Y%m%d', time.localtime(time.time()))
        #备份目录(自定义)
        self.SQLDir = '/webtv/mpctest/dbmysql/pre/'
        self.LogFile = self.SQLDir + self.Tim + '.log'
        # 本地保留2天
        self.SaveDays = 2
        # 指定源数据库ip
        self.sourceDBHost = '10.101.89.52'
        # 源数据库用户
        self.sourceDBUser = 'mysql'
        # 源数据库密码
        self.sourceDBPassword = '******'
        # 数据库端口号
        self.sourceDBPort = 3306
        # 导出的库(例:webtv_cms)
        self.sourceDB = [
            "webtv_cms",
        ]
        # 不做字符串替换的库
#        self.sourceIgnoreReplaceDB = ['appfac_appfac_b81ab1497ae1133dbc41e584912d77aa']

        # 目标数据库信息
        self.targetDBHost = '192.168.3.76'
        self.targetDBUser = 'mysql'
        self.targetDBPassword = '******'
        self.targetDBPort = 3306

        # 替换文本信息
        # source 信息替换 targer
        # 本环境为线上同步至灰度,所以需替换域名及ip密码等信息,如不需要替换可直接注释
        self.replaceList = [
            {'source': '.junhao.mil.cn', 'target': '-pre.junhao.mil.cn'},
            {'source': 'JH-Mem129', 'target': 'JhPre-huidu2022'},
            {'source': '10.101.89.86', 'target': '192.168.6.201'},
        ]

    def Clean(self):
        """
        :desc: 清理文件
        :return:
        """
        clean_res = os.system("find %s -mtime +%s -type d -exec rm -rf {} \;" % (self.SQLDir, self.SaveDays))
        logging.error("clean files failed") if clean_res != 0 else logging.info("clean  files success")


    def testRun(self):
        """
        :desc: 测试数据库链接情况
        :return: {'status': 'success'}
        """

        try:

            ConnectSourceMysql = pymysql.connect(host=self.sourceDBHost, user=self.sourceDBUser,
                                                 passwd=self.sourceDBPassword, port=self.sourceDBPort)
            ConnectSourceMysql.cursor()
            ConnectTargetMysql = pymysql.connect(host=self.sourceDBHost, user=self.sourceDBUser,
                                                 passwd=self.sourceDBPassword, port=self.sourceDBPort)
            ConnectTargetMysql.cursor()
            return {'status': 'success'}

        except Exception as e:

            return {'status': 'error', 'message': str(e)}


    def RunTask(self):
        """
        :desc: 执行导出导入
        :return:
        """

        # 日志
        logging.basicConfig(level=logging.INFO, format='[%(asctime)s]  [%(levelname)s] %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S', filename=self.LogFile, filemode='a')

        # 当前导出目录
        exportDir = self.SQLDir + self.Tim
        if not os.path.exists(exportDir):
            os.makedirs(exportDir)

        # 导出sql
        for dbname in self.sourceDB :
           result = os.system(
               "mysqldump --set-gtid-purged=off --skip-lock-tables --ignore-table=webtv_cms.zdconfig -P %s -u%s -p'%s' -h %s %s > %s/%s.sql" % (
                   self.sourceDBPort, self.sourceDBUser, self.sourceDBPassword, self.sourceDBHost, dbname, exportDir,
                   dbname))
           os.system("sed -i '1iuse %s' %s/%s.sql" % (dbname, exportDir, dbname))
           if result != 0:
               print('dump %s failed !! ' % (dbname))
               logging.error('dump %s failed !! ' % (dbname))
           else:
               print('dump db %s success' % (dbname))
               logging.info('dump db %s success' % (dbname))

        # 替换数据
        for replace in self.replaceList:
            for dbname in self.sourceDB:
                result = os.system(
                    "sed -i 's/%s/%s/g' %s/%s.sql" % (replace['source'], replace['target'], exportDir, dbname)
                )
                print("sed -i 's/%s/%s/g' %s/%s.sql" % (replace['source'], replace['target'], exportDir, dbname))
                if result != 0:
                    logging.error('replace db %s source string %s failed !! ' % (dbname, replace['source']))
                else:
                    logging.info("replace db %s source string %s success" % (dbname, replace['source']))

        # # 导入数据
        for dbname in self.sourceDB :
            result = os.system(
                "mysql -P %s -u%s -p'%s' -h %s %s < %s/%s.sql" % (
                self.targetDBPort, self.targetDBUser, self.targetDBPassword, self.targetDBHost, dbname, exportDir,
                dbname)
            )
            if result != 0:
                logging.error('import %s failed !! ' % (dbname))
            else:
                logging.info("import db %s success" % (dbname))



if __name__ == '__main__':

    run = ExportImportDB()
    testResult = run.testRun()

    if testResult['status'] == 'success':
        print('mysql connected success')
        run.RunTask()
    else:
        print(run['message'])
        exit(1)

执行脚本

# 执行脚本命令
python3.6 ExportImportDB.py
# 后台执行
nohup python3.6 ExportImportDB.py &

文章来源:https://blog.csdn.net/weixin_46775222/article/details/135675251
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。