python中的Quene使用方法,包含多线程和多进程

发布时间:2024年01月13日

在Python中,队列(Queue)是一种抽象的数据类型,它遵循先进先出(FIFO)的原则。队列是一种特殊的线性表,只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。

Python标准库中的queue模块提供了多种队列的实现,包括:

  1. Queue:这是一个简单的队列类,可以用来实现先进先出的数据结构。
  2. LifoQueue:这是一个后进先出(LIFO)的数据结构,与栈类似。
  3. PriorityQueue:这是一个优先级队列,可以根据元素的优先级进行排序。

下面是一个使用queue.Queue的简单示例:

import queue

# 创建一个队列
q = queue.Queue()

# 向队列中添加元素
for i in range(5):
    q.put(i)

# 从队列中获取元素,它会按照先进先出的原则返回元素
while not q.empty():
    print(q.get())

输出:

0
1
2
3
4

queue.Queue还提供了其他一些有用的方法,如q.full()(检查队列是否已满)、q.maxsize`(获取队列的最大大小)等。此外,它还支持线程安全,可以在多线程环境中安全地使用。

属性和方法以及用法

  • qsize():返回队列中的元素个数。
import queue

# 创建一个队列
q = queue.Queue()

# 向队列中添加元素
q.put("item1")
q.put("item2")
q.put("item3")

# 获取队列中的元素个数
size = q.qsize()
print("队列大小:", size)

运行结果:
在这里插入图片描述

  • mutex:返回队列的锁对象,用于多线程环境中的同步操作。通过使用mutex属性,可以确保在多线程访问队列时,对队列的操作是线程安全的。代码如下:
import queue
import threading

# 创建一个队列和锁对象
q = queue.Queue()
mutex = q.mutex


# 定义一个线程函数,用于向队列中添加元素
def add_item(item):
    with mutex:
        q.put(item)

    # 定义一个线程函数,用于从队列中获取元素


def get_item():
    with mutex:
        item = q.get()
        return item

    # 创建两个线程,分别执行添加和获取操作


thread1 = threading.Thread(target=add_item, args=("item1",))
thread2 = threading.Thread(target=get_item)

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()
  • not_full():如果队列未满,则返回True,否则返回False。
  • maxsize(可选参数,默认为0,用于设定队列长度。maxsize小于等于0表示队列长度无限。)
  • put(item, block=True, timeout=None)(在队尾插入一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有空间可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列已满且block为False,将引发Full异常。)
import queue  
  
# 创建一个队列  
q = queue.Queue()  
  
# 向队列中添加元素  
for i in range(5):  
    q.put(i)  
  
# 获取队列的大小  
size = q.qsize()  
print("队列大小:", size)  
  
# 从队列中获取元素并打印  
while not q.empty():  
    item = q.get()  
    print(item)
  • get(block=True, timeout=None)(从队头删除并返回一个项目。如果block参数为True(默认值),且队列为空,该方法会阻塞直到有项目可用。如果timeout参数提供,该方法在等待时有超时限制。如果队列为空且block为False,将引发Empty异常。)
  • empty()(检查队列是否为空,返回True如果队列为空,否则返回False。)
import queue

# 创建一个队列
q = queue.Queue()
# 检查队列是否为空
if q.empty():
    print("队列为空")
else:
    print("队列非空")

# 向队列中添加元素
q.put("item1")
q.put("item2")

# 检查队列是否为空
if q.empty():
    print("队列为空")
else:  
    print("队列非空")

  • full()(检查队列是否已满。如果队列已满,返回True,否则返回False。)
import queue

# 创建一个队列,容量为3
q = queue.Queue(maxsize=3)

# 向队列中添加元素
for i in range(5):
    try:
        q.put(i, block=False)
    except queue.Full:
        print("队列已满,添加元素失败")

# 检查队列是否已满
if q.full():
    print("队列已满")
else:
    print("队列未满")

运行结果:
在这里插入图片描述

  • get_nowait():与get()方法类似,但不进行阻塞等待。如果队列为空,将引发Empty异常。
import queue

q = queue.Queue()

try:
    item = q.get_nowait()
except queue.Empty:
    print("队列为空,无法获取元素。")

在这里插入图片描述

  • put_nowait():与put()方法类似,但不进行阻塞等待。如果队列已满,将引发Full异常。
import queue

q = queue.Queue(maxsize=3)

try:
    q.put_nowait("item")
    q.put_nowait("item1")
    q.put_nowait("item2")
    q.put_nowait("item3")

except queue.Full:
    print("队列已满,无法添加元素。")

在这里插入图片描述

  • join():Queue.join() 是 Python 的 queue 模块中的一个方法,用于阻塞当前线程,直到队列中的所有任务都已完成处理。

当你使用多线程或多进程从队列中获取并处理任务时,Queue.join() 方法可以确保主线程等待所有任务都已完成后再继续执行。这样可以避免因任务未完成而导致主线程提前结束。

多线程中的Quene

task_done() 是 Python 的 queue 模块中的一个方法,用于标记队列中的一个任务已经完成。

当多个线程或进程同时从队列中获取任务并处理时,可以使用 task_done() 方法来通知队列中已经完成了一个任务。这有助于确保队列中的任务能够被正确地处理,并且可以避免出现死锁或阻塞的情况。

代码如下:

import queue
import threading
import time

# 创建一个队列
q = queue.Queue()


# 定义一个生产者线程函数,将数据添加到队列中
def producer(thread_id):
    for i in range(5):
        print(f"生产者 {thread_id} 生产了 {i}")
        q.put(i)
        time.sleep(1)

    # 定义一个消费者线程函数,从队列中获取数据并处理


def consumer(thread_id):
    while True:
        item = q.get()
        print(f"消费者 {thread_id} 消费了 {item}")
        time.sleep(1)
        q.task_done()

    # 创建生产者线程


threads = []
for i in range(2):
    t = threading.Thread(target=producer, args=(i,))
    threads.append(t)
    t.start()

# 创建消费者线程
for i in range(3):
    t = threading.Thread(target=consumer, args=(i,))
    threads.append(t)
    t.start()

# 等待所有任务完成
q.join()

运行结果:
在这里插入图片描述

如果使用get_nowait,应该怎么写呢?代码如下:

import queue
import threading
import time

# 创建一个队列
q = queue.Queue()


# 定义一个生产者线程函数,将数据添加到队列中
def producer(thread_id):
    for i in range(5):
        print(f"生产者 {thread_id} 生产了 {i}")
        q.put(i)
        time.sleep(1)

    # 定义一个消费者线程函数,从队列中获取数据并处理


def consumer(thread_id):
    while True:
        try:
            item = q.get_nowait()
            print(f"消费者 {thread_id} 消费了 {item}")
            time.sleep(1)
        except queue.Empty:
            print("队列为空,无法获取元素。")
            pass
        time.sleep(1)


    # 创建生产者线程


threads = []
for i in range(2):
    t = threading.Thread(target=producer, args=(i,))
    threads.append(t)
    t.start()

# 创建消费者线程
for i in range(3):
    t = threading.Thread(target=consumer, args=(i,))
    threads.append(t)
    t.start()

# 等待所有任务完成
q.join()

在这里插入图片描述

多进程中的Quene

在Python的multiprocessing模块中,Queue是一个进程安全的队列类,它允许在多个进程之间进行安全的通信。这个队列是基于Python标准库中的queue模块实现的,并添加了多进程的支持。

多进程存,主线程取

import multiprocessing
import time

def worker(q, num):
    """ Worker function that puts data into the queue. """
    for i in range(5):
        print(f"Worker {num} produced {i}")
        q.put(i)
    q.put("STOP")  # 用于告诉主进程所有工作已经完成

if __name__ == "__main__":
    q = multiprocessing.Queue()  # 创建一个队列对象
    processes = []
    for i in range(3):  # 创建3个工作进程
        p = multiprocessing.Process(target=worker, args=(q, i))
        processes.append(p)
        p.start()
    
    while True:
        item = q.get()  # 从队列中获取数据
        if item == "STOP":  # 如果获取到"STOP",则所有工作进程已经完成
            break
        print(f"Main process consumed {item}")
    
    for p in processes:  # 等待所有工作进程结束
        p.join()

运行结果:
在这里插入图片描述

在这个示例中,我们创建了一个Queue对象q,并创建了三个工作进程。每个工作进程将数据添加到队列中,主进程从队列中获取数据并处理。当工作进程完成所有任务后,它会在队列中放置一个"STOP"标记,主进程通过检查这个标记来知道所有工作进程已经完成。最后,主进程等待所有工作进程结束。

注意,在使用multiprocessing.Queue时,你需要确保传递给子进程的队列对象是通过multiprocessing.Queue()创建的,而不是通过标准库中的queue.Queue()创建的。因为标准库中的queue.Queue不是线程安全的,也不支持多进程。

多进程存,多进程取

import multiprocessing
import time


def worker(q, num):
    for i in range(5):
        print(f"Worker {num} produced {i}")
        q.put(i)

def get_item(q):
    while True:
        item = q.get()  # 从队列中获取数据
        print(f"get process consumed {item}")


if __name__ == "__main__":
    q = multiprocessing.Queue()  # 创建一个队列对象
    processes = []
    for i in range(3):  # 创建3个工作进程
        p = multiprocessing.Process(target=worker, args=(q, i))
        processes.append(p)
        p.start()

    for i in range(3):  # 创建3个工作进程
        p = multiprocessing.Process(target=get_item,args=(q,))
        processes.append(p)
        p.start()


    for p in processes:  # 等待所有工作进程结束
        p.join()

在这里插入图片描述

文章来源:https://blog.csdn.net/hhhhhhhhhhwwwwwwwwww/article/details/135569593
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。