现有A系统作为下游数据系统,上游系统有A1,A2,A3...
需要将A1,A2,A3...的数据达到某条件后(比如:A1系统销售单提交出库成功)自动触发MQ然后再经过数据清洗落到A系统,并将清洗后数据通过特定规则汇总在A系统报表中
现在需要QA同学验证的功能是:
A系统存储数据清洗后的库表(为切片表)有几十个,且前置系统较多,测试工作量也较多
需要核对清洗后A存库数据是否准确
清洗规则:(1)直接取数 (2)拼接取数 (3)映射取数
直接取数字段在2系统表中字段命名规则一致
so,以下测试工具是针对直接取数规则来开发,以便于测试
(1)将表字段,来源系统表和切片表 数据库链接信息,查询字段 作为变量
将这些信息填入input.xlsx 作为入参
(2)读取表字段,根据来源系统表 数据库链接信息,查询字段
查询来源库表,将查询出字段值存储outfbi.xlsx
? (3)读取表字段,根据切片表 数据库链接信息,查询字段
查询切片库表,将查询出字段值存储outods.xlsx
(4)对比outfbi.xlsx,outods.xlsx的字段值
对比后生成result.xlsx文件,新增列校验结果
核对字段值一致校验结果为Success,否则为Fail
入参文件见附件
DbcheckApi.py
import os
import pymysql
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
import datetime
import ast
"""测试数据路径管理"""
SCRIPTS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
GENERATECASE_DIR = os.path.join(SCRIPTS_DIR, "dbcheck")
inputDATAS_DIR = os.path.join(GENERATECASE_DIR, "inputdata")
outDATAS_DIR = os.path.join(GENERATECASE_DIR, "outdata")
class DbcheckApi():
def __init__(self,data):
self.inputexcel=data
workbook = load_workbook(filename=self.inputexcel)
sheet = workbook['数据源']
# 读取来源表-连接信息
sourcedb_connection_info = ast.literal_eval(sheet['B3'].value)
odsdb_connection_info = ast.literal_eval(sheet['B4'].value)
source_db = sheet['C3'].value.strip()
ods_db = sheet['C4'].value.strip()
source_queryby = sheet['D3'].value.strip()
ods_queryby = sheet['D4'].value.strip()
print(sourcedb_connection_info)
print(odsdb_connection_info)
print(source_db)
print(ods_db)
print(source_queryby)
print(ods_queryby)
self.sourcedb = sourcedb_connection_info
self.odsdb = odsdb_connection_info
self.source_db = source_db
self.ods_db = ods_db
self.source_queryby = source_queryby
self.ods_queryby = ods_queryby
def source_select_db(self):
host = self.sourcedb.get('host')
port = self.sourcedb.get('port')
user = self.sourcedb.get('user')
passwd = self.sourcedb.get('passwd')
db = self.sourcedb.get('db')
if not host or not port or not user or not passwd or not db:
error_msg = "连接信息不完整"
return {"code": -1, "msg": error_msg, "data": ""}
cnnfbi = pymysql.connect(
host=host,
port=port,
user=user,
passwd=passwd,
db=db
)
cursor = cnnfbi.cursor()
try:
# 读取Excel文件
df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')
# 获取第1列,从第2行开始读取的字段名
fields = df.iloc[1:, 0].tolist()
print(fields)
# 构建查询SQL语句
sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.source_db, self.source_queryby)
print(sql)
# 执行查询语句
cursor.execute(sql)
except pymysql.err.OperationalError as e:
error_msg = str(e)
if "Unknown column" in error_msg:
column_name = error_msg.split("'")[1]
msg={"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}
print(msg)
return {"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}
else:
return {"code": -1, "msg": error_msg, "data": ""}
print(error_msg)
# 获取查询结果
result = cursor.fetchall()
# 关闭游标和连接
cursor.close()
cnnfbi.close()
# 检查查询结果是否为空
if not result:
return {"code": -1, "msg": f"查询无数据,请检查sql: {sql}", "data": ""}
else:
# 将结果转换为DataFrame对象
df = pd.DataFrame(result, columns=fields)
odskey=self.source_db+'表-字段'
odsvalue=self.source_db+'表-字段值'
# 创建新的DataFrame对象,将字段和对应值放在两列
df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})
outexcel = os.path.join(outDATAS_DIR, 'outputfbi.xlsx')
# 导出结果到Excel文件
df_new.to_excel(outexcel, index=False)
def ods_select_db(self):
host = self.odsdb.get('host')
port = self.odsdb.get('port')
user = self.odsdb.get('user')
passwd = self.odsdb.get('passwd')
db = self.odsdb.get('db')
if not host or not port or not user or not passwd or not db:
raise ValueError("连接信息不完整")
cnnfbi = pymysql.connect(
host=host,
port=port,
user=user,
passwd=passwd,
db=db
)
cursor = cnnfbi.cursor()
try:
# 读取Excel文件
df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')
# 获取第1列,从第2行开始读取的字段名
fields = df.iloc[1:, 0].tolist()
print(fields)
# 构建查询SQL语句
sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.ods_db, self.ods_queryby)
print(sql)
# 执行查询语句
cursor.execute(sql)
except pymysql.err.OperationalError as e:
error_msg = str(e)
if "Unknown column" in error_msg:
column_name = error_msg.split("'")[1]
return {"code": -1, "msg": f"列 {column_name} 不存在"+self.ods_db+" 表结构中,请检查!", "data": ""}
else:
return {"code": -1, "msg": error_msg, "data": ""}
# 获取查询结果
result = cursor.fetchall()
# 关闭游标和连接
cursor.close()
cnnfbi.close()
# 将结果转换为DataFrame对象
df = pd.DataFrame(result, columns=fields)
# 创建新的DataFrame对象,将字段和对应值放在两列
odskey=self.ods_db+'表-字段'
odsvalue=self.ods_db+'表-字段值'
df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})
# 导出结果到Excel文件
outexcel = os.path.join(outDATAS_DIR, 'outputfms.xlsx')
df_new.to_excel(outexcel, index=False)
def check_order(self):
self.source_select_db()
self.ods_select_db()
outputfbi = os.path.join(outDATAS_DIR, 'outputfbi.xlsx')
outputfms = os.path.join(outDATAS_DIR, 'outputfms.xlsx')
df_a = pd.read_excel(outputfbi)
df_b = pd.read_excel(outputfms)
# 创建新的DataFrame对象用于存储C表的数据
df_c = pd.DataFrame()
# 将A表的列写入C表
for col in df_a.columns:
df_c[col] = df_a[col]
# 将B表的列���入C表
for col in df_b.columns:
df_c[col] = df_b[col]
odsvalue=self.ods_db+'表-字段值'
fbivalue=self.source_db+'表-字段值'
# 比对A2和B2列的值,如果不一致,则在第5列写入"校验失败"
df_c['校验结果'] = ''
for i in range(len(df_c)):
if pd.notnull(df_c.at[i, fbivalue]) and pd.notnull(df_c.at[i, odsvalue]):
fbivalue_rounded = df_c.at[i, fbivalue]
odsvalue_rounded = df_c.at[i, odsvalue]
if isinstance(fbivalue_rounded, (int, float)):
fbivalue_rounded = round(fbivalue_rounded, 3)
elif isinstance(fbivalue_rounded, datetime.datetime):
fbivalue_rounded = round(fbivalue_rounded.timestamp(), 3)
else:
try:
fbivalue_rounded = round(float(fbivalue_rounded), 3)
except ValueError:
pass
if isinstance(odsvalue_rounded, (int, float)):
odsvalue_rounded = round(odsvalue_rounded, 3)
elif isinstance(odsvalue_rounded, datetime.datetime):
odsvalue_rounded = round(odsvalue_rounded.timestamp(), 3)
else:
try:
odsvalue_rounded = round(float(odsvalue_rounded), 3)
except ValueError:
pass
if fbivalue_rounded != odsvalue_rounded:
df_c.at[i, '校验结果'] = 'Fail'
else:
df_c.at[i, '校验结果'] = 'Success'
# 将结果写入到C.xlsx文件
df_c.to_excel('checkhead_result.xlsx', index=False)
# 打开C.xlsx文件并设置背景色
book = load_workbook('checkhead_result.xlsx')
writer = pd.ExcelWriter('checkhead_result.xlsx', engine='openpyxl')
writer.book = book
# 获取C.xlsx的工作表
sheet_name = 'Sheet1'
ws = writer.book[sheet_name]
# 设置背景色为红色
red_fill = PatternFill(start_color='FFFF0000', end_color='FFFF0000', fill_type='solid')
# 遍历校验结果列,将不一致的单元格设置为红色背景
for row in ws.iter_rows(min_row=2, min_col=len(df_c.columns), max_row=len(df_c), max_col=len(df_c.columns)):
for cell in row:
if cell.value == 'Fail':
cell.fill = red_fill
# 保存Excel文件
writer.save()
writer.close()
if __name__ == '__main__':
inputexcel = os.path.join(inputDATAS_DIR, 'input.xlsx')
DbcheckApi(inputexcel).check_order()