Python多线程和线程池的下载实战用法

发布时间:2023年12月29日

1.多线程和线程池用法区别

多线程和线程池都是Python中常用的并发编程方式,根据具体的需求和场景选择合适的方式。

  1. 多线程:

    • 优点:直观、简单,适合简单的并发任务。可以使用 threading.Thread 类创建线程,每个线程独立执行任务。
    • 缺点:线程的创建和销毁需要时间和资源消耗,如果任务数量较多,频繁地创建和销毁线程可能会影响性能。
  2. 线程池:

    • 优点:线程池可以重复利用已创建的线程,减少了线程创建和销毁的开销,提高了效率。可以使用 concurrent.futures.ThreadPoolExecutor 类创建线程池,并通过提交任务给线程池来执行任务。
    • 缺点:线程池的大小有限,如果任务数量超过线程池的最大工作线程数,任务就会排队等待执行。

根据实际情况,我们可以根据以下几个因素来选择合适的方式:

  • 任务类型:如果是一些独立无关、简单的任务,使用多线程即可;如果是需要复用线程、有限制的任务,使用线程池更合适。
  • 资源消耗:如果任务数量非常庞大,频繁地创建和销毁线程会浪费大量的资源,此时线程池更适合。
  • 控制并发度:线程池可以限制并发的最大数量,避免资源过度占用,控制并发度。

总结来说,多线程和线程池都有各自的优缺点,具体选择哪种方式取决于任务的特性和需求。如果任务数量较少且简单,多线程足够;如果任务数量较大且需要复用线程,线程池更合适。

2.Python线程池的实战下载用法

写法一:

当使用线程池进行下载时,如果需要为每个请求设置延迟而又不想使用time.sleep(),可以通过requests.get()函数的timeout参数来实现。这个参数不仅可以用于设置超时时间,还可以用于模拟延迟。

下面是一个示例代码,演示了如何使用Python线程池进行下载,并在每个请求中加入延迟:

import requests
import concurrent.futures
import time

urls = ['https://www.example.com/file1.txt',
        'https://www.example.com/file2.txt',
        'https://www.example.com/file3.txt',
        'https://www.example.com/file4.txt',
        'https://www.example.com/file5.txt']

def download_with_delay(url):
    print(f'Downloading {url}')
    response = requests.get(url, timeout=15)  # 设置连接和读取超时时间为5秒
    if response.status_code == 200:
        with open(url.split('/')[-1], 'wb') as f:
            f.write(response.content)
            print(f'{url} has been downloaded successfully')
    else:
        print(f'Failed to download {url}')

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(download_with_delay, url) for url in urls]

在上述示例中,我们定义了一个download_with_delay()函数来进行下载,并使用requests.get()函数的timeout参数来设置连接和读取的超时时间为5秒。这样做不仅可以控制超时,还可以模拟延迟的效果。

需要注意的是,timeout参数的第一个值是连接超时时间,第二个值是读取超时时间。在这个例子中,我们将它们都设置为5秒,你可以根据实际情况进行调整。

写法二

在 Python 中使用线程池进行下载可以通过 concurrent.futures 模块来实现,这里可以使用 ThreadPoolExecutor 来创建线程池。同时,你可以通过 requests.get() 方法的 timeout 参数来模拟延迟,而不使用 time.sleep()

在确定线程池中的 max_workers 参数时,通常可以根据系统的 CPU 核心数量来进行设置。一般情况下,将 max_workers 设置为 CPU 核心数量的 2 到 4 倍是比较合理的选择,具体取决于网络带宽和目标服务器的性能。

下面是一个示例代码,演示了如何使用线程池进行下载,并设置请求超时以模拟延迟:

import requests
import concurrent.futures

urls = ['https://www.example.com/file1.txt',
        'https://www.example.com/file2.txt',
        'https://www.example.com/file3.txt',
        'https://www.example.com/file4.txt',
        'https://www.example.com/file5.txt']

def download_file(url):
    print(f'Downloading {url}')
    try:
        response = requests.get(url, timeout=(5, 5))  # 设置连接和读取超时时间为5秒
        if response.status_code == 200:
            with open(url.split('/')[-1], 'wb') as f:
                f.write(response.content)
                print(f'{url} has been downloaded successfully')
        else:
            print(f'Failed to download {url}')
    except requests.exceptions.RequestException as e:
        print(f'Error occurred while downloading {url}: {e}')

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:  # 根据实际情况调整 max_workers 的数量
    executor.map(download_file, urls)

在上述示例中,我们使用了 concurrent.futures.ThreadPoolExecutor 创建了一个拥有多个线程的线程池,并使用 executor.map() 方法来提交下载任务。同时,我们在 requests.get() 方法中设置了超时参数来模拟延迟。

需要注意的是,max_workers 参数的值应该根据实际情况进行调整。一般来说,可以先尝试将其设置为 8 或 16,然后根据下载速度和系统负载情况进行调整。

线程池两种写法的区别

这两段代码都使用了 concurrent.futures.ThreadPoolExecutor 来进行并发任务的执行,但有一些细微的区别。

第一段代码使用了列表推导式和 executor.submit() 方法来提交任务到线程池。它会遍历 urls 列表,对于每个 URL 调用 download_with_delay() 方法,并通过 executor.submit() 将任务提交给线程池。这样可以异步地执行每个任务,并返回一个 Future 对象,可以用于获取任务的结果或异常。

with concurrent.futures.ThreadPoolExecutor() as executor:
    [executor.submit(download_with_delay, url) for url in urls]

第二段代码使用了 executor.map() 方法来提交任务到线程池。与第一段代码不同,它不需要使用列表推导式,而是直接将 urls 列表作为参数传递给 executor.map() 方法。executor.map() 方法会自动遍历 urls 列表,并以函数 download_file() 为参数,将每个 URL 作为输入进行调用。这样可以更简洁地实现并发执行任务的操作。

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    executor.map(download_file, urls)

总结起来,两者的主要区别在于任务提交的方式。第一段代码使用 executor.submit() 提交单个任务,适用于需要对每个任务进行额外处理的情况。而第二段代码使用 executor.map() 提交批量任务,适用于简单地对每个任务进行相同处理的情况。

executor.map() 的拓展

executor.map() 中,参数 urls 需要是一个可迭代的对象,例如列表、元组或集合。字典类型是不支持的,因为它不是可迭代的对象。

如果你想要使用字典类型,你可以将字典的键或值转换为一个可迭代的对象,然后将该对象传递给 executor.map()。例如,你可以使用 urls.keys()urls.values() 来获取字典的键或值的可迭代对象,然后将其传递给 executor.map()

下面是一个示例代码,演示了如何将字典的键作为可迭代对象传递给 executor.map()

import concurrent.futures

def download_file(url):
    # 下载文件的逻辑
    pass

urls = {
    'file1': 'http://example.com/file1',
    'file2': 'http://example.com/file2',
    'file3': 'http://example.com/file3'
}

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    executor.map(download_file, urls.keys())

在上述代码中,urls.keys() 返回字典 urls 的键的可迭代对象,然后将其作为参数传递给 executor.map() 方法。

同样的方式也适用于将字典的值作为可迭代对象传递给 executor.map()

python多线程的实战下载用法

使用Python多线程进行下载可以提高下载速度,并且可以同时下载多个文件,从而提高下载效率。下面是一个示例代码,演示了如何使用Python多线程进行下载:

import requests
import threading

urls = ['https://www.example.com/file1.txt',
        'https://www.example.com/file2.txt',
        'https://www.example.com/file3.txt',
        'https://www.example.com/file4.txt',
        'https://www.example.com/file5.txt']

def download_file(url):
    print(f'Downloading {url}')
    response = requests.get(url)
    if response.status_code == 200:
        with open(url.split('/')[-1], 'wb') as f:
            f.write(response.content)
            print(f'{url} has been downloaded successfully')
    else:
        print(f'Failed to download {url}')

threads = []
for url in urls:
    thread = threading.Thread(target=download_file, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

在上述示例中,我们定义了一个download_file()函数来进行下载,然后使用threading.Thread类来创建线程,并将每个文件的下载任务分配给不同的线程。最后,我们等待所有线程都结束执行,才退出程序。

需要注意的是,使用多线程下载时,要注意控制并发数量,避免对服务器造成过大的压力。此外,在下载大文件时,还需要考虑文件的分块下载和断点续传等问题,以避免网络故障或其他异常情况导致下载失败。

拓展1:线程池最大线程数的设定

当确定线程池的大小时,通常建议将线程池的最大线程数设置为CPU核心数的2到4倍。根据你提供的信息,如果你的计算机具有16个CPU核心,那么一个合理的线程池大小范围可能是32到64。

然而,线程池的最佳大小也取决于其他因素,例如你的应用程序的性质、任务的类型和计算机的其他负载等。在设置线程池大小时,你可以进行实验并观察应用程序的性能表现,以找到最佳的线程池大小。

另外,还要注意,线程池的大小不一定是越大越好。如果线程池过大,可能会导致资源竞争和上下文切换开销增加,从而降低性能。因此,需要根据具体情况找到适合你应用程序的最佳线程池大小。

拓展2:进程池的用法

进程池是Python中常用的并发编程方式之一,主要用于处理I/O密集型任务,如文件读写、网络请求等。相比于多线程,进程池具有以下几个优点:

  1. 进程之间相互独立,不会出现一个进程挂掉导致其他进程也受到影响的情况。
  2. 进程池能够更好地利用多核CPU,并且在进程切换时,不会受到GIL的限制。
  3. 进程池可以通过限制最大工作进程数来控制并发度,避免资源过度占用。

因此,当我们需要处理大量的I/O密集型任务时,可以选择使用进程池。例如,在爬取网页数据时,每个线程需要等待网络IO操作完成才能继续执行,此时使用进程池能够更好地利用CPU资源,提高程序效率。

下面是一个简单的使用multiprocessing.Pool创建进程池的示例:

import multiprocessing

def task(task_name):
    print(f"Task {task_name} is running in process {multiprocessing.current_process().name}")
    # do something...

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=4) as pool:
        # 提交任务给进程池
        for i in range(10):
            pool.apply_async(task, args=(f"Task-{i}",))

        # 等待所有任务执行完成
        pool.close()
        pool.join()

在上面的示例中,我们首先创建了一个包含4个工作进程的进程池。然后,使用 apply_async() 方法向进程池提交了10个任务,每个任务都会打印相应的消息,并执行一些操作。

最后,我们使用进程池的 close()join() 方法等待所有任务完成并释放资源。

你可以根据实际需求,将具体的任务函数替换到 task 函数中,并调整进程池的大小和具体的任务提交方式。

拓展3:通过Python查看CPU信息

你可以使用 Python 中的 os 模块来查看 CPU 的信息。具体来说,可以使用以下代码来获取 CPU 的逻辑个数、物理核心个数和当前使用率:

import os

# 获取CPU逻辑个数
cpu_count = os.cpu_count()
print("CPU 逻辑个数:", cpu_count)

# 获取CPU物理核心个数
with open("/proc/cpuinfo", "r") as f:
    cpuinfo = f.readlines()

processor_count = 0
for line in cpuinfo:
    if "physical id" in line:
        processor_count += 1

print("CPU 物理核心个数:", processor_count)

# 获取CPU使用率
cpu_percent = os.cpu_percent(interval=1)
print("CPU 使用率:", cpu_percent)

这段代码会输出当前系统的 CPU 逻辑个数、物理核心个数以及当前 CPU 的使用率。

总结

多线程——IO密集型,例如文件下载
多进程——CPU密集型,例如文件读写

文章来源:https://blog.csdn.net/m0_57021623/article/details/135253726
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。