Python进阶:Futures并发编程

发布时间:2023年12月29日

一、并行和并发

首先需要区分Python的并行和并发

  • 并发:在Python中,并发并不是指同一时刻有多个操作(thread、task)同时进行。相反,某个特定的时刻,它只允许有一个操作发生,只不过线程/任务之间会互相切换,直至完成
  • 并行:指的是在同一时刻、同时发生。Python中的multi-processing便是这个意思,对于multi-processing,可以简单地这么理解:比如电脑是6核处理器,那么在运行程序时,就可以强制Python开6个进程,同时执行,以加快运行速度

什么场景应用并行和并发呢?

  • 并发通常应用于I/O操作频繁的场景,比如你要从网站上下载多个文件,I/O操作的时间可能会比CPU运行处理的时间长的多
  • 并行更多应用于CPU heavy的场景,比如MapReduce中的并行计算,为了加快运行速 度,一般会用多台机器,多个处理器来完成

二、Futures并发

1、单线程示例

import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content),url))

def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Portal:Computer_science',
        'https://en.wikipedia.org/wiki/Portal:Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Portal:Java_(programming_language)',
        'https://en.wikipedia.org/wiki/Portal:PHP',
        'https://en.wikipedia.org/wiki/Portal:Node.js',
        'https://en.wikipedia.org/wiki/Portal:The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Portal:Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} second'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

2、多线程示例

import concurrent.futures

import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content),url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector:
        exector.map(download_one, sites)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Portal:Computer_science',
        'https://en.wikipedia.org/wiki/Portal:Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Portal:Java_(programming_language)',
        'https://en.wikipedia.org/wiki/Portal:PHP',
        'https://en.wikipedia.org/wiki/Portal:Node.js',
        'https://en.wikipedia.org/wiki/Portal:The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Portal:Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} second'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

上述两段代码,单线程和多线程版的主要区别在于:

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector:
        exector.map(download_one, sites)

上述代码表示,创建了个线程池,总共有5个线程可以分配使用,map高阶函数表示并发的对sites的每一个元素调用函数download_one()
也可以通过并行的方式运行上述代码

with futures.ThreadPoolExecutor(workers) as executor
	=>
with futures.ProcessPoolExecutor() as executor

在上述代码中,ProcessPoolExecutor()表示创建进程池,使用多个进程并行,这里,通常省略参数workers,因为系统会自动返回CPU的数量作为可以调用的进程数,但是使用多进程效果并不一定显著,因为并行的方式一般用在CPU heavy的场景中,而上述场景是I/O heavy

3、详解Futures

  • Python中的Futures模块,位于concurrent.futures和asyncio中,它们都表示带有延迟的操作。Futures会将处于等待状态的操作包裹起来放在队列中
  • 作为用户,不用考虑如何去创建Futures,这些Futures底层都会帮我们处理好。我们要做的是去调度这些Futures的执行

4、常用方法
submit()
除了上述的map调用方式,还可以使用submit方法来调用:

  • submit接受一个函数及它的参数列表,不会立即执行函数,而是在适当的时机由submit()方法在线程池中执行
def download_all(sites):
	with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
		executor.submit(download_one,site)

result()

  • result():用于获取 Future 对象的最终结果。如果任务还未完成,调用 result() 方法会阻塞直到任务完成并返回结果。
def download_all(sites):
	with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
		future = executor.submit(download_one,site)
		result = future.result()
		print(result)

如果download_one函数有返回值,那么result就是返回值,若没有,返回值为None

as_completed()

  • as_completed():函数,返回一个迭代器,用于迭代已完成的 Future 对象。它允许你以异步的方式处理已完成任务的结果。
def download_all(sites):
	with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
		futures = [executor.submit(download_one,site) for site in sites]
	for future in concurrent.futures.as_completed(futures):
		result = future.result()
		print(result)

三、为什么多线程每次只能有一个线程执行?

  • Python的解释器并不是线程安全的,为了解决由此带来的race condition等问题,Python便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行
文章来源:https://blog.csdn.net/nzbing/article/details/135274133
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。