python线程池提交任务

发布时间:2024年01月07日

1. 线程池参数设置

  1. CPU数量:N
  2. 线程池的核心线程数量
    IO密集型的话,一般设置为 2 * N + 1
    CPU密集型的话,一般设置为 N + 1 或者 使用进程池。
  3. 线程池的最大任务队列长度
    (线程池的核心线程数 / 单个任务的执行时间)* 2
    如果线程池有10个核心线程,单个任务的执行时间为0.1s,那么最大任务队列长度设置为200。
from concurrent.futures import ThreadPoolExecutor
thread_pool = ThreadPoolExecutor(max_workers=10)

2. submit方式提交

submit 这种提交方式是一条一条地提交任务:
1. 可以提交不同的任务函数;
2. 线程池的线程在执行任务时出现异常,程序不会停止,而且也看不到对应的报错信息;
3. 得到的结果是乱序的。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_task(delay):
    print(f"------------> start to execute task {delay} <------------")
    time.sleep(delay)
    print(f"------------> task {delay} execute over !!! <------------")
    return delay + 10000

task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)


############################### 方式1. 虽然是异步提交任务,但是却是同步执行任务。
for p in task_params:
    future = thread_pool.submit(run_task, p)
    print(future.result())      # 直接阻塞当前线程,直到任务完成并返回结果,即变成同步


############################### 方式2. 异步提交任务,而且异步执行任务,乱序执行,结果乱序。
future_list = []
for p in task_params:
    future = thread_pool.submit(run_task, p)
    future_list.append(future)

for res in as_completed(future_list):       # 等待子线程执行完毕,先完成的会先打印出来结果,结果是无序的
    print(f"get last result is {res.result()}")

3. map方式提交

submit 这种提交方式可以分批次提交任务:

  1. 每个批次提价的任务函数都相同;
  2. 线程池的线程在执行任务时出现异常,程序终止并打印报错信息;
  3. 得到的结果是有序的。
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_task(delay):
    print(f"------------> start to execute task {delay} <------------")
    time.sleep(delay)
    print(f"------------> task {delay} execute over !!! <------------")
    return delay + 10000

task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 5      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)

task_res = thread_pool.map(run_task, task_params)		# 批量提交任务,乱序执行
print(f"main thread run finished!")
for res in task_res:		# 虽然任务是乱序执行的,但是得到的结果却是有序的。
    print(f"get last result is {res}")

4. 防止一次性提交的任务量过多

import time
from concurrent.futures import ThreadPoolExecutor

def run_task(delay):
    print(f"------------> start to execute task <------------")
    time.sleep(delay)
    print(f"------------> task execute over !!! <------------")

task_params = [1, 4, 2, 5, 3, 6] * 100
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)
threadpool_max_queue_size = 200     # 线程池任务队列长度一般设置为  (线程池核心线程数/单个任务执行时间)* 2

for p in task_params:
    print(f"*****************> 1. current queue size of thread pool is {thread_pool._work_queue.qsize()}")
    while thread_pool._work_queue.qsize() >= threadpool_max_queue_size:
        time.sleep(1)		# sleep时间要超过单个任务的执行时间
    print(f"*****************> 2. current queue size of thread pool is {thread_pool._work_queue.qsize()}")
    thread_pool.submit(run_task, p)

print(f"main thread run finished!")
文章来源:https://blog.csdn.net/CSDN1csdn1/article/details/135439064
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。