在导入execl时,客户要求模板并不是死模板,是可动态的删除某些列,以下实现
只需知道 要导入哪些重要信息接即可,
下面也可生成错误报告等
def generate_error_excel(self, worksheet, workbook, row, row_line, error_reason):
""" 生成错误数据报告excel """
if row == 0:
for col, column in enumerate(row_line):
worksheet.write(0, col, column)
worksheet.col(col).width = (15 * 367)
worksheet.write(0, len(row_line), '错误提示')
else:
for line in range(len(row_line)):
worksheet.write(row, line, row_line[line])
worksheet.write(row, len(row_line), error_reason)
# save
buffer = io.BytesIO()
workbook.save(buffer)
self.update({'error_file': base64.encodebytes(buffer.getvalue()),
'error_name': '错误报告-{}'.format(self.file_name)})
def btn_confirm(self):
"""确认导入"""
workbook = xlwt.Workbook(encoding='utf-8')
worksheet = workbook.add_sheet('导入数据问题', cell_overwrite_ok=True)
wb = xlrd.open_workbook(file_contents=base64.decodebytes(self.file_upload or b''))
sheets = wb.sheet_names()
sheet1 = wb.sheet_by_name(sheets[0])
row_num = sheet1.nrows # 行数
# col_num = sheet1.ncols # 列数
purchase_model = self.env['uoms.tracking.purchase'].sudo()
purchase_line_model = self.env['uoms.tracking.purchase.line'].sudo()
product_model = self.env['product.product'].sudo()
abnormal_state_model = self.env['uoms.abnormal.state'].sudo()
inventory_status_model = self.env['uoms.inventory.status'].sudo()
execute_node_model = self.env['uoms.execute.node'].sudo()
message = []
try:
error_row = 0
sheet1_row = sheet1.row_values(0)
if '采购订单号' not in sheet1_row:
raise UserError(_("请添加列 采购订单号!"))
if '货号' not in sheet1_row:
raise UserError(_("请添加列 货号!"))
if '行号' not in sheet1_row:
raise UserError(_("请添加列 行号!"))
sheet1_dict = {b: a for a,b in enumerate(sheet1_row)}
field_data = self.judge_name_row(sheet1_dict)
self.generate_error_excel(worksheet, workbook, error_row, sheet1.row_values(0), '')
for row in range(1, row_num):
field_value_dict = {}
row_line = sheet1.row_values(row)
for seq in field_data:
if not row_line[field_data[seq]]:
continue
field_value_dict[seq] = row_line[field_data[seq]]
purchase_code = row_line[field_data['purchase_code']]
product_code = int(row_line[field_data['product_code']]) if type(row_line[field_data['product_code']]) == float else row_line[field_data['product_code']]
line_number = int(row_line[field_data['line_number']]) if type(row_line[field_data['line_number']]) == float else row_line[field_data['line_number']]
purchase_id = purchase_model.search([('code', '=', str(purchase_code).strip())])
if not purchase_id:
error_row += 1
self.generate_error_excel(worksheet, workbook, error_row, row_line,
'未匹配到号为{}的采购订单号'.format(str(purchase_code).strip()))
continue
product_id = product_model.search([('default_code', '=', str(product_code).strip())])
if not product_id:
error_row += 1
self.generate_error_excel(worksheet, workbook, error_row, row_line,
'未匹配货号为{}的物料'.format(str(product_code).strip()))
continue
purchase_line_id = purchase_line_model.search(
[('kingee_purchase_number', '=', str(int(line_number)).strip()),
('uoms_purchase_id', '=', purchase_id.id),
('product_id', '=', product_id.id)])
if not purchase_line_id:
error_row += 1
self.generate_error_excel(worksheet, workbook, error_row, row_line, '采购订单+行号+物料与系统内信息不一致')
continue
field_value_dict['purchase_id'] = purchase_id.id
field_value_dict['purchase_line_id'] = purchase_line_id.id
field_value_dict['product_id'] = product_id.id
foreign_planned_date, domestic_planned_date, next_date = self.get_date(field_data, row_line)
if field_data.get('unusual'):
unusual_id = abnormal_state_model.search([('name', '=', str(row_line[field_data['unusual']]).strip())])
field_value_dict['unusual_id'] = unusual_id.id if unusual_id else False
if field_data.get('inventory_id'):
inventory_id = inventory_status_model.search([('name', '=', str(row_line[field_data['inventory_id']]).strip())])
field_value_dict['inventory_id'] = inventory_id.id if inventory_id else False
if foreign_planned_date:
field_value_dict['foreign_planned_date'] = foreign_planned_date
if domestic_planned_date:
field_value_dict['domestic_planned_date'] = domestic_planned_date
if next_date:
field_value_dict['next_date'] = next_date
data = field_value_dict
self.env['uoms.tracking.import'].with_context(from_interface=True).create(data)
except Exception as e:
message = str(e)
if message:
raise UserError('发生错误(%s), 请联系!!' % message)
elif self.error_file:
return {
"name": _("导入问题向导"),
"view_type": "form",
"view_mode": "form",
"res_model": "tracking.import.wizard",
"res_id": self.id,
"view_id": False,
"views": [[self.env.ref("uoms_tracking.tracking_import_error_form").id, "form"]],
"type": "ir.actions.act_window",
"target": "new",
}
else:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'title': _('消息'),
'message': _('导入成功!'),
'sticky': False,
'next': {'type': 'ir.actions.act_window_close'},
}
}
def judge_name_row(self, name_dict):
"""判断 模板名称 是否在列表里,"""
field_data = {'purchase_code': name_dict['采购订单号'], 'product_code': name_dict['货号'], 'line_number': name_dict['行号']}
if '供应商订单号' in name_dict:
field_data['supplier_code'] = name_dict['供应商订单号']
if '下次跟单日期' in name_dict:
field_data['next_date'] = name_dict['下次跟单日期']
if '异常情况' in name_dict:
field_data['unusual'] = name_dict['异常情况']
if '异常数量' in name_dict:
field_data['unusual_qty'] = name_dict['异常数量']
if '库存情况' in name_dict:
field_data['inventory_id'] = name_dict['库存情况']
if '厂家反馈' in name_dict:
field_data['manufacturer_feedback'] = name_dict['厂家反馈']
if '采购反馈' in name_dict:
field_data['purchase_feedback'] = name_dict['采购反馈']
if '国外-计划发货日期' in name_dict:
field_data['foreign_planned_date'] = name_dict['国外-计划发货日期']
if '国内-计划发货日期' in name_dict:
field_data['domestic_planned_date'] = name_dict['国内-计划发货日期']
if '计划发货数量' in name_dict:
field_data['planned_shipment_quantity'] = name_dict['计划发货数量']
return field_data
def get_date(self, field_data, row_line):
"""返回 国内外 -计划发货日期"""
foreign_date = row_line[field_data['foreign_planned_date']] if field_data.get('foreign_planned_date', False) else None
domestic_date = row_line[field_data['domestic_planned_date']] if field_data.get('domestic_planned_date', False) else None
next_date = row_line[field_data['next_date']] if field_data.get('next_date', False) else None
if type(foreign_date) == float:
foreign_planned_date = xldate_as_datetime(foreign_date, 0).date() if foreign_date else None
else:
foreign_planned_date = datetime.datetime.strptime(foreign_date, "%Y-%m-%d").date() if foreign_date else None
if type(domestic_date) == float:
domestic_planned_date = xldate_as_datetime(domestic_date, 0).date() if domestic_date else None
else:
domestic_planned_date = datetime.datetime.strptime(domestic_date, "%Y-%m-%d").date() if domestic_date else None
if type(next_date) == float:
next_date = xldate_as_datetime(next_date, 0).date() if next_date else None
else:
next_date = datetime.datetime.strptime(next_date, "%Y-%m-%d").date() if next_date else None
return foreign_planned_date, domestic_planned_date, next_date