ZLMediaKit中的RingBuffer

发布时间:2023年12月24日

前面的文章讲到ZLMediaKit转流,提到过RingBuffer,它是比较核心的数据结构。这篇文章就来讲讲RingBuffer的用法。

RingBuffer的类体系

RingBuffer是由多个类组成,分为两大功能:存储和数据分发。
存储功能由类RingStorage实现,分发功能由RingReaderDispactcher,RingDelegateRingReader)。下面是它们的类图:
ZLMediaServer RingBuffer (1).jpg
RingBuffer类是"大总管",封装整个体系的功能,提供对外的接口。

数据存储

RingStorage是数据存储类,它是一个循环队列,有最大容量定义,从尾部插入最新数据,当队列满了,从头部删除老数据。

它的对所存储的数据的定义,借用了视频GOP的概念。
将GOP视为一个元素(一个视频GOP中包含多个视频nalu)。
RingStorage中将GOP称为更适合,里有包含的更基本的元素,下面是它的定义:

template <typename T>
class _RingStorage 

基本元素为模板类型,可以存入任意类型。

它包含了一个类型为GopType容器,如下:
using GopType = List<List<std::pair<bool, T>>>;
GopType _data_cache;
它是一个List,元素也是一个List。可见是以组为单位存储数据。

在视频数据中,GOP包含的是两个关键帧之间的的nalu数据,所以它的write接口有一个是否为key的参数,如下:

void write(T in, bool is_key = true)

它的构造函数如下:

_RingStorage(size_t max_size, size_t max_gop_size)

max_size是指最大元素个数,就是GOP的数量*GOP的大小。
max_gop_size是指最大GOP的个数。

下面是一个使用示例:

//RingBuffer是_RingStorage的封装
//最大size为100,GOP最大个数为1
RingBuffer<int>::Ptr g_ringBuf(new RingBuffer<int>(100,nullptr,1));
//GOP 011
g_ringBuf->write(0,true);
g_ringBuf->write(1,false);
g_ringBuf->write(1,false);
//GOP 022
g_ringBuf->write(0,true);
g_ringBuf->write(2,false);
g_ringBuf->write(2,false);
//GOP 033
g_ringBuf->write(0,true);
g_ringBuf->write(3,false);
g_ringBuf->write(3,false);

上面的例子将0作为key(当然可以是任意值),两个key之间就是GOP的数据(GOP的长度可以是任意长度)。
因为定义的GOP个数为1,所以buffer最终缓存的是0,3,3。前面的0,1,1,0,2,2都被删除了。
对视频nalu数据来说,**RingStorage**就是一个GOP buffer,缓存最少一个GOP的数据。这样可以保证快速出图。

数据分发

先看RingBuffer的整体结构图

RingBuffer结构图.jpg

RingBuffer中的数据结构std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map,是以EventPoller对象为key,所以它可以跨线程的分发数据。

RingReaderDispatcher内有多个RingReader对象,是数据流向的目的端。

RingReader就是数据的出口,调用RingBuffer类的attch方法获取一个RingReader对象,再调用setReadCB方法设置数据回调,就可以取到数据了。

attch有一个EventPoller类型的形参,需要传入的是目的对象所在的线程。

这篇文章中提到过,MediaSource对象作为数据源,内部都有一个RingBuffer,通过它拿到一个RingReader对象后就可以取到这个MediaSource的源了。

比如,以rtmp推流,http-flv拉流时,那么连接rtmp源和flv的基本代码结构如下:

//poller为_ring_reader对象所在的线程
_ring_reader = media->getRing()->attach(poller);
//获取源信息的回调
_ring_reader->setGetInfoCB(...);
//当源关闭时的回调
_ring_reader->setDetachCB(...);
//设置读取数据的回调
_ring_reader->setReadCB(...);

具体的代码见,void FlvMuxer::start方法。

下面是RingBuffer中数据流转图

ZLMediaKit RingBuffer数据分发图.jpg
通过write写入数据,数据从RingBufferRingReaderDispatcher,再到RingReader,再通过onReadCB回调至dst。

RingBuffer使用的例子

#include <iostream>
#include "Util/logger.h"
#include "Util/util.h"
#include "Util/RingBuffer.h"

using namespace std;
using namespace toolkit;

//创建一个RingBuffer对象,存储int元素
//最大size为100,缓存(max_gop_size)为1
RingBuffer<int>::Ptr g_ringBuf(new RingBuffer<int>(100,nullptr,1));

//数据回调
void onReadEvent1(int i) {
    std::cout<<i<<std::endl;
}

//src释放时的回调
void onDetachEvent(){
    WarnL;
}

int main() {
    //初始化日志
    auto fileChannel = std::make_shared<toolkit::FileChannel>("FileChannel", toolkit::exeDir());
    Logger::Instance().add(fileChannel);
    Logger::Instance().setWriter(std::make_shared<toolkit::AsyncLogWriter>());

    //RingBuffer reader线程
    auto poller1 = EventPollerPool::Instance().getPoller(false);
    //在线程中设置reader
    poller1->async([&]{
        //通过attach方法获取一个RingReader,设置为使用cache
        auto reader = g_ringBuf->attach(poller1,true);
        //设置数据读取回调
        reader->setReadCB([](int i){
            onReadEvent1(i);
        });
    	//设置src关闭时的回调
        reader->setDetachCB([](){
            onDetachEvent();
        });
    });

    //在主线程中写入数据
    //GOP 011
    g_ringBuf->write(0,true);
    g_ringBuf->write(1,false);
    g_ringBuf->write(1,false);
    //GOP 022
    g_ringBuf->write(0,true);
    g_ringBuf->write(2,false);
    g_ringBuf->write(2,false);
    //GOP 033
    g_ringBuf->write(0,true);
    g_ringBuf->write(3,false);
    g_ringBuf->write(3,false);

    std::this_thread::sleep_for(std::chrono::seconds(10));
}
文章来源:https://blog.csdn.net/mo4776/article/details/135185986
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。