? ? ? ? 这个架构是博主在使用Django的过程中总结出来的一些经验之谈,只是作为一个经验的分享和记录,写的比较随意,仅供参考,博主也会不断补充,欢迎大家交流。
????????博主更倾向于使用类对象的方式去完成功能的实现,大部分的方法返回结果是一个字典,包含数据、警告信息、错误信息。
data_dict = {'data': "", "warning_info": "", "error_info": "", }
return data_dict
? ? ? ? 此方法的主要作用是,使用线程同时多次处理指定函数(并且携带一个参数),次方法只是实现调度的功能,具体执行的操作,还是指定的函数操作,例如我传入的函数所实现的功能是打印传入的参数,调用此方法就能实现线程同时打印多个传入的参数。具体的要执行的业务功能取决与传入的函数。
? ? ? ? 此方法的大致执行逻辑:根据条件每批次创建指定个数的线程,然后每批进行等待指定时长,最后会对所有线程的返回结果进行汇总,如果指定函数的返回结果数据是列表,则不断扩展这个列表,如果返回的是字典,则不断扩展这个列表,如果都不是这两种类型,会将返回结果全部追加到一个新的列表。对于错误或警告信息,都会追加汇总。
# 创建线程任务
def create_tasks(data_list, func, batch_size=2,min_wait_time=1, max_wait_time=3):
"""
:param data_list: 需要创建任务的列表
:param func: 任务函数名称
:param batch_size: 每批任务数量
:param min_wait_time: 每批任务最短等待时长(秒)
:param max_wait_time: 每批任务最长等待时长(秒)
:return: 返回数字字典
"""
data_dict = {
'data': "",
"warning_info": "",
"error_info": "",
}
unknown_data_list = []
data_content_dict = {}
data_content_list = []
data_type = "else"
for i in range(0, len(data_list), batch_size):
all_tasks = []
split_list = data_list[i:i + batch_size]
execute = ThreadPoolExecutor(batch_size)
for img_url in split_list:
all_tasks.append(execute.submit(func, img_url))
for future in as_completed(all_tasks):
future_result_dict = future.result()
error_info = future_result_dict['error_info']
warning_info = future_result_dict['warning_info']
if error_info:
data_dict['error_info'] += error_info + "\n"
elif warning_info:
data_dict['warning_info'] += warning_info + "\n"
else:
if isinstance(future_result_dict["data"], list):
data_type = "list"
data_content_list.extend(future_result_dict["data"])
elif isinstance(future_result_dict["data"], dict):
data_type = "dict"
data_content_dict.update(future_result_dict["data"])
else:
# data_dict['error_info'] += "非列表或字典对象" + "\n"
# print("返回的数据类型为:",type(future_result_dict["data"]))
unknown_data_list.append(future_result_dict["data"])
execute.shutdown(wait=True)
time.sleep(random.uniform(min_wait_time, max_wait_time))
if data_type == "list":
return_data = data_content_list
elif data_type == "dict":
return_data = data_content_dict
else:
return_data = unknown_data_list
data_dict["data"] = return_data
return data_dict
? ? ? ? 此方法的主要作用是,对传入的函数返回结果进行更新,不断追加错误和警告信息,覆盖为新的结果数据。
? ? ? ? 此方法的大致执行逻辑:传递两个数据字典,第一个是原有的数据字典、第二个是新的数据字段,对错误和警告信息进行去重追加。
# 处理返回的数据
def handle_result_dict(data_dict, result_dict, is_first=False):
warning_info_list = [info for info in result_dict["warning_info"].split("\n") if info]
result_dict["warning_info"] = "\n".join(list(set(warning_info_list)))
error_info_list = [info for info in result_dict["error_info"].split("\n") if info]
result_dict["error_info"] = "\n".join(list(set(error_info_list)))
if is_first:
return result_dict
if result_dict['error_info']:
data_dict['error_info'] += result_dict['error_info'] + '\n'
if result_dict['warning_info']:
data_dict['warning_info'] += result_dict['warning_info'] + '\n'
data_dict["data"] = result_dict["data"]
return data_dict
? ? ? ? 此方法的主要作用是,对指定网址发送请求,获取response对象。
? ? ? ? 此方法的大致执行逻辑:对指定网址发送get或post请求,如果需要携带参数,可将参数一同放入kwargs中,进行解包裹传递。程序会先判断响应状态码是否为200,如果不为200,则添加错误信息。
# 发送请求
class SendRequest:
def __init__(self, url, request_type, **kwargs):
"""
对指定网址发送请求,返回字典数据,返回的数据为响应体
:param url: 请求的网址
:param request_type: 请求的类型:GET或POST
:param kwargs: 其余携带的参数,包括headers、cookie、proxies等
"""
self.url = url
self.request_type = request_type
self.request_data = kwargs.get("request_data", None)
self.headers = kwargs.get("headers", None)
self.cookie = kwargs.get("cookie", None)
self.proxies = kwargs.get("proxies", None)
# 发送请求
def send_reqeust(self):
data_dict = {'data': "", "warning_info": "", "error_info": "", }
params = {
'url': self.url,
'method': self.request_type,
'headers': self.headers,
'proxies': self.proxies,
'cookies': self.cookie
}
# Get请求
if self.request_type == 'GET':
params["params"] = self.request_data
# Post请求
elif self.request_type == 'POST':
params["data"] = self.request_data
else:
data_dict["error_info"] = f"所设置的请求类型无效"
return data_dict
# print(params)
# return data_dict
# 发起请求
try:
response = requests.request(**params)
if response.status_code != 200:
data_dict["error_info"] = f"【{self.url}】:状态码不等于200,响应的状态码为【{response.status_code}】"
return data_dict
except Exception as e:
data_dict["error_info"] = f"【{self.url}】:发送请求失败,{str(e)}"
return data_dict
data_dict["data"] = response
response.close()
return data_dict
????????此方法的主要作用是,创建结果文件夹或拼接具体的文件名。
? ? ? ? 此方法的大致执行逻辑:在指定的路径下创建一个名为结果文件的文件夹,或是在该结果文件夹下拼接加上时间的文件名。
# 创建结果文件或文件夹
def create_result_file_or_folder(app_path, create_type="file", file_base_name=None, suffix_type="xlsx"):
"""
:param app_path: 创建的文件的路径
:param create_type: 创建文件的类型,默认是【单独的文件】,否则为文件夹
:param file_base_name: 文件的名字(不用带后缀)
:param suffix_type: 文件类型,默认是【xlsx】
:return: 返回数据字典
"""
data_dict = {
"data": "",
"warning_info": "",
"error_info": "",
}
date = datetime.datetime.today().date()
result_dir = os.path.join(app_path, '结果文件', str(date))
# 创建结果文件夹目录
if not os.path.exists(result_dir):
os.makedirs(result_dir)
# 判断创建文件的类型
if create_type == "file":
now_time = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")
try:
file_path = os.path.join(result_dir, f'{file_base_name}-{now_time}.{suffix_type}')
except Exception as e:
data_dict["error_info"] = "文件创建失败,请检查文件命名"
return data_dict
data_dict["data"] = file_path
else:
data_dict["data"] = result_dir
return data_dict
更多的架构模块还在不断补充中.........