阿里云
pip install -i https://mirrors.aliyun.com/pypi/simple/ <package_name>
清华大学
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ <package_name>
中国科学技术大学
pip install -i https://pypi.mirrors.ustc.edu.cn/simple/ <package_name>
多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会等待IO完成。
多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务。
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行。
使用Lock对资源加锁,防止冲突访问。
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式。
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果。
使用subprocess启动外部程序的进程,并进行输入输出交互。
Python的并发编程有三种方式:
CPU密集型计算(CPU-bound)
CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高。主要是受到CPU的限制。
例如:压缩解压缩,加密解密,正则表达式搜索
IO密集型计算(I/O bound)
IO密集型指的是系统运作大部分的状况是CPU在等I/O(硬盘/内存)的读/写操作,CPU占用率仍然较低。主要是受到IO的限制。
例如:文件处理程序,网络爬虫程序,读写数据库程序
多线程,多进程,多协程的对比:
多进程 Process(multiprocessing模块)
多线程 Thread(threading模块)
多线程只能同时使用一个CPU
优点:相比进程,更轻量级,占用资源少。
缺点:
适用于:IO密集型计算,同时运行的任务数目要求不多。
多协程 Coroutine(asyncio模块)
全局解释器锁(GlobalInterpreter Lock,缩写GIL),是Python解释器的一个特性,它是为了解决Python在多线程编程中可能出现的线程安全问题而引入的。GIL是一种互斥锁(Mutex),它保证了任何时刻只有一个线程可以执行Python字节码。
是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。
GIL会导致多线程程序的性能受限。由于只有一个线程可以执行Python字节码,所以在CPU密集型任务中,多线程并不能充分利用多核处理器的优势。但在IO密集型任务中,线程的等待时间可以释放GIL,多线程可以提高性能。
为了解决多线程之间数据完整性和状态同步问题。
Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。
每个Python对象都有一个内置的引用计数器,当对象被创建时,其引用计数为1。每当有新的引用指向这个对象(如赋值、函数传递等操作),该对象的引用计数会增加;当不再有引用指向这个对象时,引用计数就会减少。
当一个对象的引用计数达到0时,Python解释器会认为该对象不再被任何变量或数据结构所引用,因此可以安全地释放其占用的内存空间。这种基于引用计数的内存管理方式简单高效,实时性较好,能够及时回收不再使用的内存资源。
开始:线程A和线程B都引用了对象obj,obj.ref_num = 2 线程A和线程B都想撤销对obj的引用。
错误:因为obj已经不存在了,最后的这两行代码可能破坏内存。
规避 GIL 限制的方式有如下两种:
方法如下:
def my_func(a,b):
do_craw(a,b)
import threading
t = threading.Thread(target=my_func,args=(100,200))
t.start()
t.join()
爬虫程序
import requests
urls = [
f"https://www.cnblogs.com/#p{page}"
for page in range(1, 50+1)
]
def craw(url):
res = requests.get(url)
# r.text是其中一个属性,它代表从服务器接收到的响应体内容,以字符串形式表示。
print(url, len(res.text))
return res.content
# craw(urls[0])
print(craw(urls[0]).decode("utf8"))
多线程爬取
import wuhu
import threading
import time
# 单线程
def single_thread():
print("single_thread begin")
for url in wuhu.urls:
wuhu.craw(url)
print("single_thread end")
# 多线程
def multi_thread():
print("multi_thread begin")
# 定义空列表 threads,用于存储将要创建的线程对象。
threads = []
for url in wuhu.urls:
threads.append(
threading.Thread(target=wuhu.craw, args=(url,))
)
for thread in threads:
# 启动线程
thread.start()
for thread in threads:
# 等待结束
thread.join()
print("multi_thread end")
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("single_thread cost:", end - start, "seconds")
print("============================================")
start = time.time()
multi_thread()
end = time.time()
print("multi_thread cost:", end - start, "seconds")
参数解析:
target
: 这是必需的参数,指定了在线程启动后要执行的目标函数或方法。在本例中,target=wuhu.craw
表示当线程开始运行时,将调用名为wuhu
模块中的 craw
的函数。args
: 这是一个可选参数,用于传递给目标函数的参数元组。这里的 (url, )
表示将参数 url
传入到 craw
函数中。所以当新创建的线程开始执行时,它会以 craw(url)
的形式调用目标函数。单线程爬虫运行结果:
多线程爬虫运行结果:
Python中生产者消费者模式多线程的使用场景
queue.Queue
或者RabbitMQ、Kafka等消息中间件)。复杂的事情都不会一下做完,而是会分为很多中间步骤一步步的完成。
queue.Queue可用于多线程之间的,线程安全的数据通信
在Python中,生产者和消费者是多线程或多进程编程模型中的两个关键角色,用于描述一种典型的并发编程问题——生产者消费者问题。这种模型有助于解决数据的同步和通信问题,尤其是在涉及资源有限(如缓冲区大小有限)的情况下。
生产者(Producer):
消费者(Consumer):
import requests
from bs4 import BeautifulSoup
# Python的列表推导式
urls = [
# f"https://www.cnblogs.com/#p{page}"
f"https://www.cnblogs.com/sitehome/p/{page}"
for page in range(1, 50+1)
]
# 生产者
def craw(url):
res = requests.get(url)
# r.text是其中一个属性,它代表从服务器接收到的响应体内容,以字符串形式表示。
# print(url, len(res.text))
return res.text
# 消费者
def parse(html):
# class="post-item-title"
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
if __name__ == "__main__":
# print(urls[2])
for result in parse(craw(urls[1])):
print(result)
代码解析:
from bs4 import BeautifulSoup
是在Python中导入BeautifulSoup库的一个语句。BeautifulSoup是一个用于解析HTML和XML文档的Python库,它能够帮助我们以结构化的方式访问和操作网页内容。
soup = BeautifulSoup(html, "html.parser")
是使用BeautifulSoup库将传入的HTML字符串转换成一个BeautifulSoup对象。这里使用了Python内置的HTML解析器 "html.parser"
。
links = soup.find_all("a", class_="post-item-title")
调用BeautifulSoup对象的 find_all()
方法查找所有的 <a>
标签,并且这些标签必须有一个CSS类名为 “post-item-title”。这将返回一个包含所有匹配标签的对象列表。
最后是列表推导式,遍历上一步找到的所有符合条件的 <a>
标签。对于每个标签 link
,获取其 href
属性作为元组的第一个元素(链接地址),通过调用 link.get_text()
获取标签内的纯文本内容作为元组的第二个元素。最后将每个这样的元组添加到新的列表中。
link["href"]
:对于列表中的每个 link
对象(即 <a>
标签),访问其 “href” 属性。
link.get_text()
:同样针对每个 link
对象,调用 .get_text()
方法来获取 <a>
标签内部包含的所有文本内容。
运行结果:
import wuhu
import time
import queue
import random
import threading
# 生产者
# url_queue 存储待爬取的URL。html_queue存储已爬取到的HTML内容。
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
# 该函数将一直尝试从 url_queue 中获取新的URL进行处理。
while True:
# 从URL队列中取出一个URL。此方法会阻塞当前线程直到有可用的URL(即队列不再为空)。
url = url_queue.get()
# 执行爬取操作
html = wuhu.craw(url)
# 把爬取到的HTML内容通过 html_queue.put(html) 方法放入HTML队列中,以便其他消费者线程或后续处理流程可以读取这些数据。
html_queue.put(html)
# 打印当前线程的名字
# qsize()方法用于返回队列中的元素数量。
print(threading.current_thread().name, f"craw {url} ", "url_queue.size=", url_queue.qsize())
# 随机延时
time.sleep(random.randint(1, 2))
# 消费者
# html_queue存储已爬取到的HTML内容,文件对象 fout。
def do_parser(html_queue: queue.Queue, fout):
# 该函数将一直尝试从 html_queue 中获取新的HTML内容进行处理。
while True:
# 从HTML队列中取出爬虫抓取到的一个HTML内容。
html = html_queue.get()
# 执行解析操作
results = wuhu.parse(html)
for result in results:
# 写入到指定的输出文件 fout 中,每条记录后跟一个换行符,这样可以将每一条解析结果清晰地分隔开,便于后续的数据处理或分析。
fout.write(str(result) + "\n")
print(threading.current_thread().name, f"results.size", len(results), "html_queue.size=", html_queue.qsize())
# 随机延时
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
# 创建queue.Queue()的对象
url_queue = queue.Queue()
html_queue = queue.Queue()
# 将所有的URL放入URL队列中
for url in wuhu.urls:
url_queue.put(url)
# 生产者线程
for idx in range(3):
# 其中name参数表示是线程的名字,会创建出名字分别为 "craw0"、"craw1" 和 "craw2" 的三个线程。
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
# 启动线程
t.start()
fout = open("data.txt", "w")
# 消费者线程
for idx in range(2):
t = threading.Thread(target=do_parser, args=(html_queue, fout), name=f"parse{idx}")
# 启动线程
t.start()
代码解析:
queue
模块是标准库的一部分,它提供了一种线程安全的方式来处理多线程间的生产者消费者问题。通过使用 queue
中的各种队列类,开发者可以轻松地创建缓冲区来在生产者线程和消费者线程之间传递数据,并且这些操作都是线程安全的。运行结果:
其中html_queue会有一定的波动,因为消费者是一次取一个进行消费
最后生产者提前进行结束,而消费者是以此进行处理最后为0。消费者在消费完后会卡住,直到有新的数据被生产者放入缓冲区。
输出的文件
补充:
f"https://www.cnblogs.com/#p{page}"
f"https://www.cnblogs.com/sitehome/p/{page}"
在爬虫抓取网页内容时,第一种URL格式:
f"https://www.cnblogs.com/#p{page}"
这里的哈希符号(#)表示页面内的锚点(Anchor),它仅用于浏览器内部导航,跳转到同一页面内的某个位置。对于服务器来说,#
后面的内容通常不会作为请求的一部分发送给服务器,也就是说,无论 page
变化为何值,实际上服务器收到的都是同一个请求(即不包含 #pX
部分的原始URL)。因此,在爬取过程中,即使你改变 #p
后面的页码,实际获取的还是博客首页的内容。
第二种URL格式:
f"https://www.cnblogs.com/sitehome/p/{page}"
看起来是访问一个动态生成内容的页面,其中 page
参数可能用于标识不同的页面编号。这种情况下,服务器会根据 page
参数的不同值返回不同的页面内容,所以通过这种方式可以实现对多页内容的爬取。
总结来说,第一种URL格式无法用于翻页爬取是因为哈希部分不会被服务器处理,而第二种URL格式则包含了服务器识别并响应不同页面请求所需的信息。
函数库在多线程环境中被调用时线程安全指某个函数、能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全
上图展示的是一个取钱的过程,每次取钱,先进行if判断,然后再减去金额。
余额为1000元,线程1执行到if判断完,就被切换到线程2了。 此时,线程2也进入到了if中又被切换到线程1,线程1继续执行下去,减去相应的金额,取到了钱。切换到线程2,也减去金额,取到了钱,显然就有问题了。 银行亏了600块。
用法一:try-finally模式
import threading
lock = threading.Lock()
lock.acquire() # 获取锁 其他线程就进不到下面的try中了
try:
# do something
finally:
lock.release() # 释放锁,其他线程就可以通过前面的acquire获取到锁了。
用法二:with模式(更常用)
import threading
lock = threading.Lock()
with lock:
# do something
说明:在Python中,当使用 with
语句结合锁对象(如 threading.Lock()
或 threading.RLock()
)时,不需要显式地调用 release()
方法来释放锁。这是因为 with
语句块提供了上下文管理协议的实现,它会在进入和退出代码块时自动处理资源的获取和释放。
import threading
import time
class Account:
# balance余额
def __init__(self, balance):
self.balance = balance
# account 账户,amount 所取的金额
def draw(account, amount):
if account.balance >= amount:
print(threading.current_thread().name, "取钱成功")
time.sleep(1) # 如果不加锁,这里休息1秒,每次都会出问题,因为这里会引起线程阻塞,一定会切换
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(target=draw, name="ta", args=(account, 800))
tb = threading.Thread(target=draw, name="tb", args=(account, 800))
ta.start()
tb.start()
运行结果:
问题解决:
import threading
import time
# 实例化线程锁
lock = threading.Lock()
class Account:
# balance余额
def __init__(self, balance):
self.balance = balance
# account 账户,amount 金额
def draw(account, amount):
with lock:
# 判断当前所取的金额是否小于余额
if account.balance >= amount:
time.sleep(1) # 如果不加锁,这里休息1秒,每次都会出问题,因为这里会引起线程阻塞,一定会切换
print(threading.current_thread().name, "取钱成功")
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == '__main__':
account = Account(1000)
# 启动两个线程 分别取800块
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
tb = threading.Thread(name="tb", target=draw, args=(account, 800))
ta.start()
tb.start()
代码解释:
with lock
表示在执行此块内的代码时,首先获取锁,确保同一时刻只有一个线程能执行此段代码。这样可以确保对账户余额的操作是原子性的,不会因为多个线程同时取款而导致余额计算错误。运行结果:
首先,新建线程,然后准备就绪,等cpu调用,如果被调用,则开始运行,如果被切换,则又返回就绪状态,如果是因为io或者sleep,则进入阻塞状态,阻塞结束则又回到就绪状态,反反复复,直到执行完。之所以要采用线程池,是因为新建线程系统需要分配资源,终止线程系统需要回收资源,如果可以重用线程,则可以减去新建、终止的开销。
用法一:map函数,很简单。注意map的结果和入参是顺序对应的。
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
# pool.map()函数接受两个参数:
# - craw: 这是一个可调用对象(例如函数),它将在每个给定的任务上运行。
# - urls: 这是一个包含多个URL或其他需要craw函数处理的数据项的可迭代对象(如列表)。
# pool.map()方法会并行地将craw函数应用到urls中的每个元素上,意味着它会创建多个线程来同时处理这些任务。
# 当所有任务完成后,map方法会返回一个结果迭代器。
# results变量存储了所有任务完成后的结果,其顺序与urls中元素的顺序保持一致。
results = pool.map(craw, urls)
for result in results:
print(result)
用法二:futures模式,更强大。注意如果用as_completed顺序是不定的。
from concurrent.futures import ThreadPoolExecutor, as_completed
# with语句确保在执行完所有任务后正确关闭线程池。ThreadPoolExecutor可以根据系统资源动态调整线程的数量,用于并发执行多个任务。
with ThreadPoolExecutor() as pool:
# 列表推导式,遍历urls列表中的每一个url,对于每个url,调用pool.submit(craw, url)提交一个异步任务到线程池。submit方法会立即返回一个Future对象,这个对象代表了将来某刻才能获取到的结果。
futures = [ pool.submit(craw, url) for url in urls ]
for future in futures:
print(future.result())
for future in as_completed(futures):
print(future.result())
代码解析:
concurrent.futures
是Python标准库中用于并发编程的一个模块,它提供了一种高层级的接口,使得开发者能够更容易地执行异步任务,无论是使用线程(ThreadPoolExecutor
)还是进程(ProcessPoolExecutor
)来并发执行函数。
第一个循环等待所有任务完成并打印结果:
for future in futures:
print(future.result())
在这个循环中,通过调用每个Future
对象的result()
方法,会阻塞当前线程直到该任务完成并返回结果。然后打印出每个任务的结果。注意,此处任务的执行顺序不一定与urls列表的顺序相同,因为它们是并发执行的。
第二个循环使用as_completed
函数来异步获取结果:
for future in as_completed(futures):
print(future.result())
as_completed(futures)
返回一个迭代器,它会按照完成的顺序依次提供futures
列表中的Future
对象。所以在第二个循环中,当一个任务完成时,立刻获取并打印其结果,无需等待所有任务全部完成。这种方式相比前一个循环更加灵活,可以实现异步非阻塞的结果处理。
import concurrent.futures
import wuhu
# 生产者
# 使用with语句创建一个线程池,线程池在with语句结束后会被正确关闭。
with concurrent.futures.ThreadPoolExecutor() as pool:
# pool.map()方法会将wuhu.urls列表中的每个URL作为参数传递给wuhu.craw函数。意味着它会创建多个线程来同时处理这些任务。
htmls = pool.map(wuhu.craw, wuhu.urls)
htmls = list(zip(wuhu.urls, htmls))
for url, html in htmls:
print(url, len(html))
print("craw over")
# 消费者
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
# pool.submit(craw, url)提交一个异步任务到线程池
future = pool.submit(wuhu.parse, html)
futures[future] = url
# 按照顺序来打印结果
# for future, url in futures.items():
# print(url, future.result())
# 按照任务完成的顺序来打印结果
for future in concurrent.futures.as_completed(futures):
url = futures[future]
# 调用future.result()方法,该方法会阻塞当前线程直到该Future对象所代表的任务完成,并返回任务的结果。
# 相当于是从任务队列中选择任务进行执行,然后直到任务完成,并返回任务的结果。
print(url, future.result())
生产者代码解析:
创建线程池:
with concurrent.futures.ThreadPoolExecutor() as pool:
使用with
语句创建一个线程池,线程池在with
语句结束后会被正确关闭。线程池允许我们利用多线程技术并发执行多个任务。
执行并行任务:
htmls = pool.map(wuhu.craw, wuhu.urls)
使用pool.map()
方法将wuhu.urls
列表中的每个URL作为参数传递给wuhu.craw
函数。map()
方法会并行地对列表中的每个元素执行craw
函数,并将结果收集起来。htmls
变量将存储所有网页抓取结果,顺序与wuhu.urls
列表中的URL顺序保持一致。
将URL和抓取结果配对:
htmls = list(zip(wuhu.urls, htmls))
使用zip()
函数将原始的URL列表和抓取结果列表配对在一起,形成一个新的元组列表,每个元组包含一个URL和其对应的HTML内容。
运行结果:
消费者者代码解析:
提交任务到线程池并存储Future对象:
futures = {}
for url, html in htmls:
future = pool.submit(wuhu.parse, html)
futures[future] = url
这段代码遍历htmls
,其中htmls
是一个包含URL和HTML内容的元组列表。对每个HTML内容调用wuhu.parse()
函数并使用pool.submit()
方法提交到线程池。然后将返回的Future
对象作为键,对应的URL作为值,存入字典futures
中。Future
对象代表异步执行任务的最终结果。
等待并处理任务结果(这会导致主线程阻塞直到所有任务完成。):
for future, url in futures.items():
print(url, future.result())
遍历futures
字典的.items()
,这会返回一个包含(Future对象, URL)
的迭代器。对每个键值对,调用future.result()
方法,该方法会阻塞当前线程直到对应的任务完成,并返回任务的结果。最后,打印出URL和该任务(即解析HTML内容)的结果。
.items()
方法返回一个视图对象,该对象包含了字典中的所有键值对。每个键值对是一个元组,其中第一个元素是键,第二个元素是对应的值。使用concurrent.futures.as_completed()
等待并处理完成的任务:
for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())
这段代码遍历as_completed(futures)
返回的迭代器,该迭代器按照任务完成的顺序提供Future
对象。当某个任务完成时,从futures
字典中取出对应的URL,并调用future.result()
获取该任务的结果。最后,打印出URL和解析结果。
运行结果:
第一种方式是按照顺序的
第二种方式是不断地查看已提交的异步任务中哪些已经完成,允许按照任务完成的顺序逐个处理结果,而不是死板地按照任务提交的顺序等待所有任务完成。
Web后台服务的特点:
Web服务对响应时要求非常高,比如要求200MS返回。
Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API。
Web服务经常需要处理几万、几百万的同时请求。
使用线程池ThreadPoolExecutor的好处:
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
# Flask 微框架中创建 web 应用的基本步骤。这里的 Flask 是 Flask 框架的核心类,用于定义和配置 web 应用程序。
app = flask.Flask(__name__)
def read_file():
time.sleep(0.1)
return "file result"
def read_db():
time.sleep(0.2)
return "db result"
def read_api():
time.sleep(0.3)
return "api result"
@app.route("/")
def index():
result_file = read_file()
result_db = read_db()
result_api = read_api()
return json.dumps({
"result_file": result_file,
"result_db": result_db,
"result_api": result_api,
})
if __name__ == "__main__":
app.run()
代码解析:
app = flask.Flask(__name__)
这行代码在 Flask 框架中创建了一个 Web 应用实例。通俗来讲,就好比你正在准备建造一个网站(Web 应用),而 flask.Flask
就是一个帮助你搭建这个网站的工具箱。
__name__
是 Python 中的一个特殊变量,它表示当前运行模块的名称。在这里使用 __name__
作为参数,Flask 可以准确地知道你的应用是从哪个模块启动的,这对于正确设置应用的基本路径和资源查找非常重要。flask.Flask(__name__)
时,实际上是在调用 Flask 类的构造函数,生成一个新的 Flask 应用实例并赋值给变量 app
。这个 app
对象就像是你的整个 Web 应用的核心控制器,你可以在这个对象上定义路由规则、配置应用属性、注册蓝图等操作。浏览器查看运行结果:
可以看到运行时间是604毫秒
修改代码来实现加速
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
# Flask 微框架中创建 web 应用的基本步骤。这里的 Flask 是 Flask 框架的核心类,用于定义和配置 web 应用程序。
app = flask.Flask(__name__)
# 创建一个线程池,默认最大线程数为当前计算机的处理器核心数
pool = ThreadPoolExecutor()
# 若要自定义线程池的最大线程数,需要在括号内添加max_workers参数
# pool = ThreadPoolExecutor(max_workers=5)
def read_file():
time.sleep(0.1)
return "file result"
def read_db():
time.sleep(0.2)
return "db result"
def read_api():
time.sleep(0.3)
return "api result"
@app.route("/")
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps({
"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result(),
})
if __name__ == "__main__":
app.run()
运行结果:
运行时间被提升到了303毫秒。
代码解析:
@app.route("/")
这一行代码的意义是:
"/"
:表示根路径,即访问应用的首页时使用的 URL,例如 http://example.com/
。@app.route
:将紧跟在其后的函数与 URL 路径 "/"
关联起来。当用户的浏览器向应用发送请求并访问该路径时,Flask 框架会调用被装饰的函数来处理这个请求。index
函数的作用是发起三个异步任务(读取文件、数据库和 API 数据),并将这三个任务的结果合并到一个 JSON 字符串中返回给客户端。
submit
方法会返回一个 Future
对象,代表异步任务的结果。
return json.dumps({...})
这行代码的作用是将包含三个异步任务结果的字典转换成 JSON 格式的字符串,并作为 HTTP 响应的内容返回给客户端。
有了多线程threading,为什么还要用多进程multiprocessing多进程?
上图的上面展示的是一个多线程执行的过程,主要通过并行IO和CPU来提高执行速度,但是对于CPU密集型运算,即上图的下面部分,一直都需CPU计算,则线程的切换耽误时间,导致多进程反而没有多线程速度快。
补充:
- 线程上下文切换开销: 在多线程环境下,CPU需要在不同线程之间切换执行,这涉及到保存和恢复当前线程的状态(寄存器、堆栈指针等),这个过程称为上下文切换。频繁的上下文切换会导致CPU花在切换上的时间增多,而不是真正执行计算任务,特别是在线程数超过CPU核心数的情况下,这种现象尤为明显。
- 缓存未命中率增加: 多线程CPU密集型计算可能导致CPU缓存利用率下降。每个线程在执行时会加载和使用自己的数据,当线程切换时,原本线程A缓存中的数据可能不再被使用,而线程B所需的数据可能不在缓存中,这就增加了缓存未命中次数,进而导致CPU需要更多时间从内存中加载数据,影响整体性能。
- 资源共享争抢: 对于某些共享资源,如CPU缓存、内存带宽等,多个线程同时访问可能会造成资源争抢,导致CPU周期的浪费。尤其是对于那些并不能完全并行执行,而是需要顺序执行的部分代码(如临界区代码),线程间需要进行同步操作,这也引入了额外的开销。
- 线程粒度过小: 如果任务分解得非常细小,以至于线程执行时间过短,上下文切换的开销就可能超过了实际执行计算的时间,导致整体效率下降。
- 硬件限制: 单核CPU只能在一个时间点上执行一个线程,即便开启多线程,也只会通过时间片轮换来模拟并行。而在多核CPU上,如果核心数目小于线程数,依然会出现上述问题。
对比多线程和多进程的实现
单线程、多线程、多进程对比CPU密集计算速度。
这里判断100个大数 是否为素数?分别对比了单线程,多线程,多进程的效率。
import time
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 判断一个数是否为质数
def is_prime(number):
if number < 2:
return False
if number == 2:
return True
if number % 2 == 0:
return False
# 从给定的整数 number 中计算并提取其平方根的整数部分。
# sqrt 函数,计算 number 的平方根。
# math.floor 函数,这个函数的功能是向下取整。
sqrt_number = int(math.floor(math.sqrt(number)))
for i in range(3, sqrt_number + 1, 2):
if number % i == 0:
return False
return True
# 单线程
def single_thread():
for number in PRIMES:
is_prime(number)
# 多线程
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
# 多进程
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
if __name__ == '__main__':
PRIMES = [112272535095293] * 100
start = time.time()
single_thread()
end = time.time()
print("single_thread, cost:", end - start, "秒")
start = time.time()
multi_thread()
end = time.time()
print("multi_thread, cost:", end - start, "秒")
start = time.time()
multi_process()
end = time.time()
print("multi_process, cost:", end - start, "秒")
程序运行结果:
import json
import math
import flask
from concurrent.futures import ProcessPoolExecutor
app = flask.Flask(__name__)
def is_prime(number):
if number < 2:
return False
if number == 2:
return True
if number % 2 == 0:
return False
# 从给定的整数 number 中计算并提取其平方根的整数部分。
# sqrt 函数,计算 number 的平方根。
# math.floor 函数,这个函数的功能是向下取整。
sqrt_number = int(math.floor(math.sqrt(number)))
for i in range(3, sqrt_number + 1, 2):
if number % i == 0:
return False
return True
@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))
if __name__ == "__main__":
# 创建一个线程池,默认最大线程数为当前计算机的处理器核心数
# 在多进程中需要定义在main函数中
process_pool = ProcessPoolExecutor()
app.run()
代码解析:
number_list = [int(x) for x in numbers.split(",")]
这段代码将请求路径中的 numbers
参数(如 “1,2,3,4,5”)按逗号分割成字符串列表,然后将每个字符串转换成整数,存储到 number_list
中。
results = process_pool.map(is_prime, number_list)
process_pool.map()
方法接受一个函数 is_prime
和一个输入列表 number_list
,并行地将 is_prime
函数应用于列表中的每个元素。这意味着所有的质数检查将在多个进程中并发执行,显著提高处理大量数据时的速度。
return json.dumps(dict(zip(number_list, results)))
zip(number_list, results)
会将原来的数字列表与质数检查结果列表配对,生成一个元组列表,然后将其转换为字典dict()
,其中字典的键是原始数字,值是它们是否为质数的布尔结果。
json.dumps()
函数将这个字典转换为 JSON 格式的字符串,以便作为 HTTP 响应返回给客户端。客户端收到响应后,可以根据 JSON 数据了解每个数字是否为质数。
运行结果:
点击控制台中的链接
浏览器的URL路径中添加/is_prime/1,2,3,4,5。并且可以看到运行时常是8ms
单线程爬虫 cpu的执行情况,可以发现,经常因为等待IO而影响CPU的执行效率。
协程主要是在单线程内实现的,以爬虫为例,协程先是让cpu爬取第一个url的内容,等待IO的时候,它又让CPU爬取第二个url的内容,当第二个任务等待IO的时候,它又让CPU爬取第三个url的内容,然后第三个任务等待IO, 它又循环回来,执行第一个任务,就这样返回循环。 所以协程就是大循环。
async关键字说明函数是一个协程。
await关键字对应IO,表示程序执行到这里的时候不进行阻塞,让超级循环直接进入下一次的循环。
注意:
要用在异步IO编程中,依赖的库必须支持异步IO特性。
爬虫引用中:request不支持异步,需要用aiohttp。
import time
import wuhu
import asyncio
import aiohttp
# 定义了一个异步函数async_craw,它接受一个参数url,这个函数用于异步地抓取指定 URL 的网页内容。
async def async_craw(url):
print("craw url: ", url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = await response.text()
print(f"craw url: {url}, {len(result)}")
loop = asyncio.get_event_loop()
# 使用协程函数来定义tasks列表
tasks = [
loop.create_task(async_craw(url))
for url in wuhu.urls
]
start = time.time()
# 等待tasks的完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end - start)
代码解析:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
创建一个 aiohttp.ClientSession
对象。在异步编程中,async with
语句表示开始进入一个异步上下文管理器。在这个例子中,相当于告诉 Python:“我要开始使用一个新的网络会话了,并且要以异步的方式(非阻塞地等待响应)来进行操作。”
使用 session.get()
方法发送一个GET请求到指定的 url
,这是一个异步操作,因此使用了 async with
语句来等待请求完成并获取响应对象 response
。response
对象包含了服务器返回的所有相关信息,如状态码、头信息以及主体内容等。
result = await response.text():
当前执行到这一行时,程序会暂停当前协程的执行(即挂起),直到 response.text()
执行完毕并返回响应体的文本内容。一旦 response.text()
的结果准备就绪,await
关键字将恢复协程的执行,并将这个文本内容赋值给变量 result
。
loop = asyncio.get_event_loop()
获取当前运行环境的事件循环(Event Loop)。在 asyncio 中,事件循环是整个异步 I/O 的核心,它负责调度协程、处理网络 I/O 事件以及调用相关的回调函数。
在Python的异步编程框架asyncio中,
loop = asyncio.get_event_loop()
这行代码是用来获取当前线程的事件循环(Event Loop)实例。通俗解释:
想象一下经营一个繁忙的咖啡厅。这个咖啡厅有一个经理(即事件循环),他负责协调所有服务员(即协程/任务)的工作流程。当有新顾客(即网络请求、文件I/O等事件)到达时,经理会安排合适的服务员去服务这位顾客。而当服务员完成任务并返回结果时,经理又会根据优先级和规则来调度下一个任务。
asyncio.get_event_loop()
就是找到或创建这样一个“经理”,它是整个异步程序的核心控制器,负责管理并发执行的任务,并按照一定的策略(如FIFO、LIFO或其他自定义策略)调度这些任务,确保它们能够高效、有序地运行。当你需要开始执行任何异步操作时,首先需要拿到这个“经理”——事件循环来进行管理和控制。
tasks = [
loop.create_task(async_craw(url))
for url in wuhu.urls
]
这段代码通过列表推导式遍历 wuhu.urls
中的所有 URL,并对每个 URL 创建一个异步任务。
loop
是 asyncio 的事件循环,它是所有异步操作的核心调度器create_task()
方法是事件循环提供的接口之一,用于将传入的协程对象封装成一个可以被事件循环管理的任务对象(Task)。loop.create_task(async_craw(url))
时,实际上创建了一个新的任务,并将 async_craw(url)
协程与给定的 url
参数关联起来,准备加入事件循环执行队列。loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait()
函数接收这个任务列表作为参数,并返回两个值:一个是已完成任务的集合(done
),另一个是尚未完成的任务的集合(pending
)。此函数是非阻塞的,它会立即返回这两个集合,并不等待所有任务都完成。loop.run_until_complete()
会启动并驱动关联的事件循环,直到给定的协程或 Future 完成为止。这意味着该方法将一直运行,直到 asyncio.wait(tasks)
中的所有任务要么完成,要么发生错误。首先通过列表推导式创建了一系列异步任务放入 tasks
列表中。然后调用 asyncio.wait(tasks)
来监视这些任务的完成状态,并不会阻止事件循环处理其他可能同时发生的事件。最后使用 loop.run_until_complete(asyncio.wait(tasks))
来确保事件循环会一直运行,直到 tasks
中至少有一个任务完成。一旦有任务完成,或者发生了异常,这个方法就会结束运行并返回结果。然后你可以根据返回的 done
和 pending
集合来进一步处理已成功和未完成的任务。
程序运行结果:
在Python中,信号量用于控制对有限资源的访问。它维护了一个计数器,可以用来限制同时访问特定资源的线程或进程的数量。
信号量(英语:Semaphore)又称为信号量、旗语是一个同步对象,用于保持在0至指定最大值之间的一个计数值。
信号量是用来控制并发度的。
主要有两种实现方式:
方式一:
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource
方式二:
sem = asyncio.Semaphore(10)
# ... later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()
import time
import wuhu
import asyncio
import aiohttp
# 创建一个计数信号量实例,初始化时计数值为10。
semaphore = asyncio.Semaphore(10)
async def async_craw(url):
async with semaphore:
# 在这里执行需要限制并发数的操作,如网络请求
print("craw url: ", url)
async with aiohttp.ClientSession() as session:
# 使用session.get()方法发送一个GET请求到指定的url,这是一个异步操作,
# 因此使用了async with语句来等待请求完成并获取响应对象response。
# response对象包含了服务器返回的所有相关信息,如状态码、头信息以及主体内容等。
async with session.get(url) as response:
result = await response.text()
await asyncio.sleep(5)
print(f"craw url: {url}, {len(result)}")
# 在asyncio中,事件循环是整个异步I/O的核心,负责调度协程、处理网络 I/O 事件以及调用相关的回调函数。也就是超级循环
loop = asyncio.get_event_loop()
# 使用协程函数来定义tasks列表
tasks = [
# 当执行该语句的时候,实际上创建了一个新的任务,将async_craw(url)协程与给定的url参数关联起来,准备加入事件循环执行队列。
loop.create_task(async_craw(url))
for url in wuhu.urls
]
start = time.time()
# 等待tasks的完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end - start)
代码解析:
semaphore = asyncio.Semaphore(10)
创建一个计数信号量(Counting Semaphore)实例,初始化时计数值为10。这个信号量可以用于限制并发执行的任务数量,例如限制同时进行的网络请求、文件读写或其他需要保护资源的操作的数量。如果当前信号量计数值大于0,则计数减1并允许代码块继续执行;若计数为0,则等待直到有其他任务释放信号量。
想象你正在经营一家只能容纳10个人的小餐厅,每次有顾客想进来就餐时,都需要先敲门(获取信号量)。如果餐厅里已经有10个人在用餐了(即信号量计数为0),那么新来的顾客就必须在外面等待,直到有人吃完离开餐厅并空出位置(释放信号量)。
async with semaphore:
在 async with
代码块内部,可以进行需要受信号量保护的操作,以确保同时执行这些操作的任务数量不超过信号量初始化时设置的计数值。
运行结果:
前10个网页爬取结束后才会进入后面网页的爬取。并且在时间上也会更长。
主要是防止爬虫将目标网站爬坏,超出了网站的处理能力。