个人主页:丷从心
系列专栏:Python基础
学习指南:Python学习指南
CPU
调度的最小单元CPU
、内存等)的最小单元import time
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
start = time.time()
work_1()
work_2()
end = time.time()
print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 4.020084857940674 s
work_2()
需要等待work_1()
运行结束后才能运行import time
import threading
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
# 运行线程
start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.0165793895721436 s
work_1()
和work_2
并发运行CPU
调度线程时的“随机性”import time
import threading
def work_1():
for i in range(5):
print('任务1...')
time.sleep(2)
def work_2():
for i in range(5):
print('任务2...')
time.sleep(2)
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
t1.start()
t2.start()
任务1...
任务2...
任务2...任务1...
任务1...任务2...
任务2...
任务1...
任务2...
任务1...
CPU
的调度算法决定的Python
程序是如何被运行的
Python
文件被解释器运行时会在操作系统中创建一个进程t = threading.Thread()
时会创建一个新的线程,称为子线程CPU
进行调度,并发地运行,具体如何调度线程由操作系统的调度算法决定thread_object.start()
t = threading.Thread()
只是创建了一个线程,并不会执行线程代码
t.start()
使线程t
达到就绪状态,等待CPU
进行调度,具体何时调度由CPU
决定
以上面的并发任务的代码为例,先注释掉t1.join()
和t2.join()
import time
import threading
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
# 运行线程
start = time.time()
t1.start()
t2.start()
# t1.join()
# t2.join()
end = time.time()
print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 0.0009987354278564453 s
end = time.time()
时,此时end
记录的时间是主线程运行到这行代码的时间print(f'总共用时: {end - start} s')
,输出时间0.0009987354278564453 s
,此时执行到了文件末尾没有其他代码,主线程会等待子线程运行结束后再退出thread_object.join()
t.join()
使主线程等待子线程,子线程任务执行结束后主线程再继续向下执行t1.join()
和t2.join()
import time
import threading
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
# 运行线程
start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.008962392807007 s
thread_object.setDaemon()
import time
import threading
def work():
for i in range(5):
print(i)
time.sleep(1)
t = threading.Thread(target=work)
# t.setDaemon(True)
t.start()
print('主线程即将退出...')
0
主线程即将退出...
1
2
3
4
import time
import threading
def work():
for i in range(5):
print(i)
time.sleep(1)
t = threading.Thread(target=work)
t.setDaemon(True)
t.start()
print('主线程即将退出...')
0
主线程即将退出...
thread_object.current_thread()
thread_object.current_thread()
用于获取当前线程对象的引用import threading
def work():
name = threading.current_thread().name # .getName()
print(name)
for i in range(5):
t = threading.Thread(target=work)
t.name = f'线程-{i}' # setName(f'线程-{i}')
t.start()
线程-0
线程-1
线程-2
线程-3
线程-4
import requests
def get_image(url):
response = requests.get(url).content
file_name = url.split('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for img_url in url_list:
get_image(img_url)
import requests
import threading
def get_image(url):
response = requests.get(url).content
file_name = url.split('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for img_url in url_list:
t = threading.Thread(target=get_image, args=(img_url,))
t.start()
import requests
import multiprocessing
def get_image(url):
response = requests.get(url).content
file_name = url.split('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
if __name__ == '__main__':
for img_url in url_list:
p = multiprocessing.Process(target=get_image, args=(img_url,))
p.start()
GIL
锁在Python
中存在全局解释器锁,即GIL
锁
GIL
是CPython
解释器独有的,主要的功能是让一个进程在同一时刻只有一个线程被执行
例如在一个进程中创建了多个线程,在运行当前程序时,同一时刻只能有一个线程被执行,其他线程等待CPU
调度,这种情况下无法利用多核CPU
的优势
如果想要绕开GIL
,那么可以使用多进程的方式,创建多个进程,使每个进程只有一个主线程,但是多进程消耗的资源比多线程的方式多
所以如果任务是I/O
密集型任务,优先使用多线程方式,如果任务是计算密集型任务,优先使用多进程方式
import threading
def add_():
global counter
for i in range(1000000):
counter += 1
def sub_():
global counter
for i in range(1000000):
counter -= 1
counter = 0
t1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
-12078
t1
对num
进行
1000000
1000000
1000000次加
1
1
1操作,线程t2
对num
进行
1000000
1000000
1000000次减
1
1
1操作,预期的结果输出
0
0
0,而实际结果输出一个奇怪的数
?
12078
-12078
?12078,多次重复运行会发现结果不尽相同mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c
counter
指向内存地址0x8049a1c
,先用mov
指令,从内存地址处取出值,放入eax
寄存器,然后给eax
寄存器的值加
1
1
1(0x1
),最后eax
的值被存回内存中相同的地址t1
运行到这段代码,它将counter
的值(假设它这时是
0
0
0)加载到它的寄存器eax
中,因此线程
1
1
1的eax = 0
,然后它向寄存器加
1
1
1,因此eax = 1
,然后此时发生了时钟中断,操作系统将当前正在运行的线程(它的程序计数器、寄存器,包括eax
等)的状态保存到线程t1
的TCB
(线程控制块)中t2
被调度运行,并进入下面这段代码mov 0x8049a1c, %eax
sub $0x1, %eax
mov %eax, 0x8049a1c
线程t2
执行了第一条指令,获取内存中counter
的值并将其放入其eax
中(每个线程都有自己的专用寄存器),此时counter
的值仍为
0
0
0,因此线程t2
的eax = 0
,假设线程t2
执行接下来的两条指令,将eax
减
1
1
1(因此eax = -1
),然后将eax
的内容保存到 counter
(地址0x8049a1c
)中,全局变量counter
现在的值是
?
1
-1
?1
然后,又发生一次上下文切换,线程t1
恢复运行,它已经执行过mov
和add
指令,现在准备执行最后一条mov
指令,线程t1
的eax = 1
,最后的mov
指令执行,将值保存到内存,counter
被设置为
1
1
1
简单来说,就是执行了一次加
1
1
1操作,一次减
1
1
1操作,而counter
被加了
1
1
1,而不是预期的counter = 0
系统对线程的调度是无法在代码层面进行控制的
from threading import Thread, RLock
lock_obj = RLock()
def add_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
counter += 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
def sub_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
counter -= 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
counter = 0
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
0
counter
的最终结果为
0
0
0RLock()
方法支持上下文管理协议,可以使用with
语句帮助我们申请和释放锁from threading import Thread, RLock
lock_obj = RLock()
def add_():
global counter
for i in range(1000000):
with lock_obj:
counter += 1
def sub_():
global counter
for i in range(1000000):
with lock_obj:
counter -= 1
counter = 0
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
0
RLock
和Lock
RLock
和Lock
Lock
Lock
是同步锁,不支持锁嵌套,一般很少使用from threading import Thread, Lock
lock_obj = Lock()
def add_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
lock_obj.acquire()
counter += 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
lock_obj.release()
def sub_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
lock_obj.acquire()
counter -= 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
lock_obj.release()
counter = 0
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
Lock
时,产生了死锁,程序会卡死RLock
RLock
是递归锁,支持锁嵌套from threading import Thread, RLock
lock_obj = RLock()
def add_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
lock_obj.acquire()
counter += 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
lock_obj.release()
def sub_():
global counter
for i in range(1000000):
lock_obj.acquire() # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
lock_obj.acquire()
counter -= 1
lock_obj.release() # 释放锁, 当锁被释放后其他等待的线程才能被正常执行
lock_obj.release()
counter = 0
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
0
import threading
data_list = list()
def list_add():
for _ in range(100000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=list_add)
t.start()
100000
200000
import threading
import time
mutex_1 = threading.Lock()
mutex_2 = threading.Lock()
class MyThread_1(threading.Thread):
def run(self):
# 对 mutex_1 上锁
mutex_1.acquire()
# mutex_1 上锁后, 延时 1 秒, 等待线程 2 对 mutex_2 上锁
print(self.name + '---mutex_1-up---')
time.sleep(1)
# 此时会堵塞, 因为 mutex_2 已经被线程 2 抢先上锁了
mutex_2.acquire()
print(self.name + '---mutex_2-up---')
# 对 mutex_2 解锁
mutex_2.release()
# 对 mutex_1 解锁
mutex_1.release()
class MyThread_2(threading.Thread):
def run(self):
# 对 mutex_2 上锁
mutex_2.acquire()
# mutex_2 上锁后, 延时 1 秒, 等待线程 1 对 mutex_1 上锁
print(self.name + '---mutex_2-up---')
time.sleep(1)
# 此时会堵塞, 因为 mutex_1 已经被线程 1 抢先上锁了
mutex_1.acquire()
print(self.name + '---mutex_1-up---')
# 对 mutex_1 解锁
mutex_1.release()
# 对 mutex_2 解锁
mutex_2.release()
if __name__ == '__main__':
t1 = MyThread_1()
t2 = MyThread_2()
t1.start()
t2.start()
Thread-1---mutex_1-up---
Thread-2---mutex_2-up---
import time
from concurrent.futures import ThreadPoolExecutor
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)
Successfully obtained page 1...
Successfully obtained page 2...
done()
done()
方法用于判断某个任务是否完成import time
from concurrent.futures import ThreadPoolExecutor
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)
# done() 方法用于判断某个任务是否完成
print(f'task_1 完成情况: {task_1.done()}')
Successfully obtained page 1...
Successfully obtained page 2...
task_1 完成情况: False
cancel()
cancel()
方法用于取消未运行的任务,已经运行的任务无法被取消import time
from concurrent.futures import ThreadPoolExecutor
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=1)
# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)
# cancel() 方法用于取消未运行的任务, 已经运行的任务无法被取消
print(f'task_2 任务取消: {task_2.cancel()}')
Successfully obtained page 1...
task_2 任务取消: True
max_workers
的值修改为
1
1
1,使得task_2
未能运行时就被取消result()
submit()
方法的返回值是一个future
对象future
对象调用result()
方法可以获取任务的返回值import time
from concurrent.futures import ThreadPoolExecutor
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)
# 通过对 future 对象调用 result() 方法获取任务的返回值
print(f'task_1 返回结果: {task_1.result()}')
print(f'task_2 返回结果: {task_2.result()}')
Successfully obtained page 1...
Successfully obtained page 2...
task_1 返回结果: 1
task_2 返回结果: 2
as_completed()
as_completed()
用于获取已经执行成功的任务的返回值import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
page_list = [1, 2, 3, 4]
# 批量提交任务并获取已经执行成功的任务的返回值
all_tasks = [executor.submit(get_html, page) for page in page_list]
# 只要线程任务执行完就能获取到返回值, 完成一个任务获取一个任务的返回值
for future in as_completed(all_tasks):
data = future.result()
print(f'Get data {data}')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
Get data 2
Get data 1
Get data 4
Get data 3
map()
map()
用于提交任务并获取已经执行成功的任务的返回值import time
from concurrent.futures import ThreadPoolExecutor
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
page_list = [1, 2, 3, 4]
# map() 用于提交任务并获取已经执行成功的任务的返回值
for data in executor.map(get_html, page_list):
print(f'Get data {data}') # 打印的返回值顺序与列表顺序一致
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
Get data 1
Get data 2
Get data 3
Get data 4
wait()
wait()
方法用于使主线程堵塞,直到指定任务完成后,主线程才解堵塞import time
from concurrent.futures import ThreadPoolExecutor, wait
def get_html(page):
print(f'Successfully obtained page {page}...')
time.sleep(1)
return page
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
page_list = [1, 2, 3, 4]
# 批量提交任务并获取已经执行成功的任务的返回值
all_tasks = [executor.submit(get_html, page) for page in page_list]
# wait() 方法用于使主线程堵塞, 直到指定任务完成后, 主线程才解堵塞
wait(all_tasks)
print('主线程解堵塞, 执行剩余代码')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
主线程解堵塞, 执行剩余代码
Thread
类Thread
类中的run()
方法,用于运行线程任务import requests
import threading
class ThreadSpider(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
response = requests.get(self.url).content
file_name = self.url.split('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for img_url in url_list:
thread_spider = ThreadSpider(img_url)
thread_spider.start()
下载完成...
下载完成...
下载完成...