Dora-rs 机器人框架学习教程(2)——摄像头数据读取

发布时间:2024年01月04日

目标:编写一个Python节点读取USB摄像头数据。

1 环境依赖

依赖条件 web camera 和 cv2

sudo apt install python3-pip
pip install numpy opencv-python pyarrow

2 创建工程

新建一个文件夹 yolo_project (后续运行yolo目标检测),从dora 官方 github拷贝文件

wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/webcam.py
wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/plot.py
wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/utils.py

创建数据流文件 dataflow_webcam.yml

nodes:
  - id: web_cam
    operator:
      python: op_webcam/webcam.py
      inputs:
        tick: dora/timer/millis/30
      outputs:
        - image
  - id: plot
    operator:
      python: op_webcam/plot.py
      inputs:
        image: web_cam/image
dora up
dora start dataflow_webcam.yml --attach --hot-reload --name webcam

上述参数中:
–attach:等待数据流完成后再返回。
–hot-reload:在数据流运行时修改Python Operator。
–name:命名数据流(可能比 UUID 简洁)。

程序运行以后会在桌面上新建一个窗口显示图像,持续20秒左右。
在这里插入图片描述若需要结束数据流可以使用 dora stop 指令或是用 ctrl+C

3 程序功能分析

在上述的功能中,从数据流文件 dataflow_webcam.yml可以看出 程序使用了两个python文件分别是 webcam.py 和 plot.py。webcam.py 内容如下,其利用opencv中的capture类读取USB 摄像头数据,将图像数据序列化以后发送到节点 plot.py 显示在窗口中。

通过yml文件中的参数(tick: dora/timer/millis/30)可以控制 webcam.py 中 def on_event(…)函数调用的周期,这里的tick参数有点类似时钟节拍的含义,每来一个节拍就进入 on_event(…)函数一次

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from typing import Callable, Optional

import os
import cv2
import numpy as np
import pyarrow as pa

fr建工程om dora import DoraStatus

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

font = cv2.FONT_HERSHEY_SIMPLEX


class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

    def on_event(
        self,
        dora_event: str,
        send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
    ) -> DoraStatus:
        match dora_event["type"]:
            case "INPUT":
                ret, frame = self.video_capture.read()
                if ret:
                    frame = cv2.resize(frame, (CAMERA_WIDTH, CA建工程MERA_HEIGHT))

                ## Push an error image in case the camera is not available.
                else:
                    frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                    cv2.putText(
                        frame,
                        "No Webcam was found at index %d" % (CAMERA_INDEX),
                        (int(30), int(30)),
                        font,
                        0.75,
                        (255, 255, 255),
                        2,
                        1,
                    )

                send_output(
                    "image",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
            case "STOP":
                print("received stop")
            case other:
                print("received unexpected event:", other)

        if time.time() - self.start_time < 20:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

4 参考资料

[1] https://dora.carsmos.ai/docs/guides/getting-started/webcam_plot

dora-rs目前资料较少 欢迎大家点赞在评论区交流讨论(cenruping@vip.qq.com) O(∩_∩)O
或者加群水一波(1149897304)

文章来源:https://blog.csdn.net/crp997576280/article/details/135376635
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。