PySide6多线程处理yolov5目标检测

发布时间:2024年01月07日

两种多线程处理方法:

1. QThreadPool+QRunnable方法

# -*- coding: utf-8 -*-
"""
Created on Wed Jul  6 10:05:38 2022

@author: wenqingzhou@gmail.com
"""

import uuid
import cv2
import sys
from PySide6.QtCore import Qt, QSize, QTimer, QThread, Slot, Signal, QRunnable, QThreadPool, QObject
from PySide6.QtWidgets import QApplication, QWidget, QGridLayout, QLabel, QMainWindow, QStatusBar, QMainWindow
from PySide6.QtGui import QPixmap, QImage, QIcon
import torch
from time import time
import numpy as np
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
# model = torch.hub.load(r"D:\Files\yolov5", "yolov5s6", source='local', pretrained=True, force_reload=True)
model.to('cpu')

# img = cv2.imread(PATH_TO_IMAGE)
classes = model.names
# results = model(imgs, size=640)  # includes NMS
def plot_boxes(results, frame):
        labels, cord = results
        n = len(labels)
        x_shape, y_shape = frame.shape[1], frame.shape[0]
        for i in range(n):
            row = cord[i]
            if row[4] >= 0.2:
                x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
                bgr = (0, 255, 0)
                cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 2)
                cv2.putText(frame, classes[int(labels[i])], (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.9, bgr, 2)

        return frame

def score_frame(frame):
    """
    转换标签和坐标
    """
    frame = [frame]
    results = model(frame,size=640)
    labels, cord = results.xyxyn[0][:, -1].cpu().numpy(), results.xyxyn[0][:, :-1].cpu().numpy()
    return labels, cord

class Thread(QThread):
    changePixmap = Signal(QImage)

    def run(self):
        cap = cv2.VideoCapture(0)
        while True:
            ret, frame = cap.read()
            if ret:
                # https://stackoverflow.com/a/55468544/6622587
                results = score_frame(frame)  # includes NMS
                frame = plot_boxes(results, frame)

                rgbImage = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                h, w, ch = rgbImage.shape
                bytesPerLine = ch * w
                convertToQtFormat = QImage(rgbImage.data, w, h, bytesPerLine, QImage.Format_RGB888)
                p = convertToQtFormat.scaled(640, 480, Qt.KeepAspectRatio)
                self.changePixmap.emit(p)

class WorkerSignal(QObject):
    data = Signal(QImage)
    process_time = Signal(str)

class Worker(QRunnable):
    def __init__(self):
        super().__init__()
        self.job_id = uuid.uuid4().hex
        self.signal = WorkerSignal()
        self.is_running = True
        
    def run(self):
        cap = cv2.VideoCapture(0)
        while self.is_running:
            ret, frame = cap.read()
            if ret:
                start_time = time()
                results = score_frame(frame)  
                frame = plot_boxes(results, frame)

                rgbImage = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                h, w, ch = rgbImage.shape
                bytesPerLine = ch * w
                convertToQtFormat = QImage(rgbImage.data, w, h, bytesPerLine, QImage.Format_RGB888)
                p = convertToQtFormat.scaled(640, 480, Qt.KeepAspectRatio)
                end_time = time()
                fps = 1/np.round(end_time - start_time, 3)
                print(f"Frames Per Second : {fps:.2f}")
                self.signal.data.emit(p)
                self.signal.process_time.emit(f'{fps:.2f}')

    def stop(self):
        self.is_running = False

class App(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowIcon(QIcon(r"E:\smile.ico"))
        self.initUI()

    def initUI(self):
        self.setWindowTitle('App')
        self.resize(640, 480)
        self.label = QLabel(self)
        self.label.resize(640, 480)
        
        self.statusbar = self.statusBar()
        self.statusbar.showMessage('Ready')

        # QThreadPool+QRunnable方法
        self.thread_pool = QThreadPool()
        self.worker = Worker()
        self.worker.signal.data.connect(self.setImage)
        self.worker.signal.process_time.connect(self.showFPS)
        self.thread_pool.start(self.worker)

        self.show()
    # 关闭程序后退出线程池
    def closeEvent(self, event):
        if self.worker is not None:
            self.worker.stop()
            self.thread_pool.waitForDone()
            self.thread_pool.clear()
        event.accept()
        
        
    @Slot(QImage)
    def setImage(self, image):
        self.label.setPixmap(QPixmap.fromImage(image))

    @Slot(str)
    def showFPS(self, fps):
        self.statusbar.showMessage(fps)
if __name__ == '__main__':

    # main()
    # 创建Qt应用程序
    # app = QApplication(sys.argv)
    if not QApplication.instance():
        app = QApplication(sys.argv)
    else:
        app = QApplication.instance()
    win = App()
    sys.exit(app.exec())

2. QThread方法

# -*- coding: utf-8 -*-
"""
Created on Sun Jan  7 00:17:49 2024

@author: wentsingzhou@qq.com
"""

import uuid
import cv2
import sys
from PySide6.QtCore import Qt, QSize, QTimer, QThread, Slot, Signal, QRunnable, QThreadPool, QObject
from PySide6.QtWidgets import QApplication, QWidget, QGridLayout, QLabel, QMainWindow, QStatusBar, QMainWindow
from PySide6.QtGui import QPixmap, QImage, QIcon
import torch
from time import time
import numpy as np
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
# model = torch.hub.load(r"D:\Files\yolov5", "yolov5s6", source='local', pretrained=True, force_reload=True)
model.to('cpu')

# img = cv2.imread(PATH_TO_IMAGE)
classes = model.names
# results = model(imgs, size=640)  # includes NMS
def plot_boxes(results, frame):
        labels, cord = results
        n = len(labels)
        x_shape, y_shape = frame.shape[1], frame.shape[0]
        for i in range(n):
            row = cord[i]
            if row[4] >= 0.2:
                x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
                bgr = (0, 255, 0)
                cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 2)
                cv2.putText(frame, classes[int(labels[i])], (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.9, bgr, 2)

        return frame

def score_frame(frame):
    """
    转换标签和坐标
    """
    frame = [frame]
    results = model(frame,size=640)
    labels, cord = results.xyxyn[0][:, -1].cpu().numpy(), results.xyxyn[0][:, :-1].cpu().numpy()
    return labels, cord

class Worker(QThread):
    data = Signal(QImage)
    process_time = Signal(str)

    def __init__(self):
        super().__init__()
        self.is_running = True

    def run(self):
        cap = cv2.VideoCapture(0)
        while self.is_running:
            ret, frame = cap.read()
            if ret:
                start_time = time()
                results = score_frame(frame)  
                frame = plot_boxes(results, frame)

                rgbImage = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                h, w, ch = rgbImage.shape
                bytesPerLine = ch * w
                convertToQtFormat = QImage(rgbImage.data, w, h, bytesPerLine, QImage.Format_RGB888)
                p = convertToQtFormat.scaled(640, 480, Qt.KeepAspectRatio)
                end_time = time()
                fps = 1/np.round(end_time - start_time, 3)
                print(f"Frames Per Second : {fps:.2f}")
                self.data.emit(p)
                self.process_time.emit(f'{fps:.2f}')

    def stop(self):
        self.is_running = False

class App(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowIcon(QIcon(r"E:\smile.ico"))
        self.initUI()
        self.worker = None

    def initUI(self):
        self.setWindowTitle('App')
        self.resize(640, 480)
        self.label = QLabel(self)
        self.label.resize(640, 480)
        
        self.statusbar = self.statusBar()
        self.statusbar.showMessage('Ready')

        self.show()

    def closeEvent(self, event):
        if self.worker is not None:
            self.worker.stop()
            self.worker.wait()
        event.accept()
        
    # QThread方法
    def startWorker(self):
        self.worker = Worker()
        self.worker.data.connect(self.setImage)
        self.worker.process_time.connect(self.showFPS)
        self.worker.start()

    def setImage(self, image):
        self.label.setPixmap(QPixmap.fromImage(image))

    def showFPS(self, fps):
        self.statusbar.showMessage(fps)

if __name__ == '__main__':
    # app = QApplication([])
    if not QApplication.instance():
        app = QApplication(sys.argv)
    else:
        app = QApplication.instance()
    win = App()
    win.startWorker()
    sys.exit(app.exec())
文章来源:https://blog.csdn.net/ouening/article/details/135434428
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。