有这样一个需求,基于webrtc实现的web实时流播放器,有4分屏,最大同时显示4个图像rtsp摄像头的图像。布局是固定的4分屏,但是分屏所显示的摄像头图像可以变化。
那么把它抽象为技术实现,就是web同时会与ZLMediaKit建立4个pc对象,这个4个pc对象建立后,就不会销毁(直到web播放器关闭),由ZLMediaKit根据web请求的rtsp摄像头标识,将不同摄像头的流转给web。
在ZLMediaKit以webrtc player为拉流端,以rtsp为推流端,是通过RingBuffer
来实现转流,直接转发的是rtp包(因为webrtc和rtsp媒体都是通过rtp来打包)。
那么切换不同的源时,就是将webrtc player的连接到不同的rtsp media source中的RingBuffer
上,显然在切换时,应该是先断掉上一次的RingBuffer
连接,再连接一个新的rtsp media source。
但是粗看RingBuffer
的接口只提供了attch
接口,没有提供detach
接口。
RingBuffer
没有提供detach
接口,但是却提供了一个readerCount
接口,返回RingBuffer
上连了多少个连流端。显然肯定有一种方式能断开RingBuffer
的连接,并且RingBuffer
是能感知到的。
仔细查看代码,发现attch
接口返回一个std::shared_ptr<RingReader>
,也就是说外部对RingReader对象至少是有部分所有权。再看_RingReaderDispatcher
的代码,发现存储RingReader对象的map里是一个std::weak_ptr
,如下:
std::unordered_map<void *, std::weak_ptr<RingReader>> _reader_map;
所以attch
返回的reader对象的所有权实际是完全归外部拥有。
而判断Reader
与RingBuffer
的连接是否存在是在_RingReaderDispatcher
的write
方法中,如下:
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
通过lock后,判断Reader
对象是否被释放。
所以如果要切换到不同的RingBuffer
,只需通过不同的RingBuffer
的attach
方法获取一个新的即可,如下例子:
#include <iostream>
#include "Util/logger.h"
#include "Util/util.h"
#include "Util/RingBuffer.h"
using namespace std;
using namespace toolkit;
//最大size为100,缓存(max_gop_size)为1
RingBuffer<int>::Ptr g_ringBuf(new RingBuffer<int>(100,nullptr,1));
//第二个RingBuffer
RingBuffer<int>::Ptr g_ringBuf2(new RingBuffer<int>(100,nullptr,1));
void onReadEvent1(int i) {
std::cout<<i<<std::endl;
DebugL<<i;
}
void onDetachEvent(){
WarnL;
}
int main() {
//初始化日志
auto fileChannel = std::make_shared<toolkit::FileChannel>("FileChannel", toolkit::exeDir());
Logger::Instance().add(fileChannel);
Logger::Instance().setWriter(std::make_shared<toolkit::AsyncLogWriter>());
std::shared_ptr<RingBuffer<int>::RingReader> reader = nullptr;
//线程1
auto poller1 = EventPollerPool::Instance().getPoller(false);
poller1->async([&]{
//设置为使用cache
reader = g_ringBuf->attach(poller1,true);
reader->setReadCB([](int i){
onReadEvent1(i);
});
reader->setDetachCB([](){
onDetachEvent();
});
});
//在第一个RingBuffer中写入数据
//GOP 011
g_ringBuf->write(0,true);
g_ringBuf->write(1,false);
g_ringBuf->write(1,false);
//GOP 022
g_ringBuf->write(0,true);
g_ringBuf->write(2,false);
g_ringBuf->write(2,false);
//GOP 033
g_ringBuf->write(0,true);
g_ringBuf->write(3,false);
g_ringBuf->write(3,false);
//在第二个RingBuffer中写入数据
//g_ringBuf2写入
g_ringBuf2->write(0,true);
g_ringBuf2->write(5,false);
g_ringBuf2->write(5,false);
//切换到g_ringBuf2
poller1->async([&]{
reader = g_ringBuf2->attach(poller1,true);
reader->setReadCB([](int i){
onReadEvent1(i);
});
reader->setDetachCB([](){
onDetachEvent();
});
});
DebugL<<"===> sleep";
std::this_thread::sleep_for(std::chrono::seconds(10));
}
上面的例子中有两个RingBuffer
,分别是g_ringBuf
和g_ringBuf2
,分别往两个buffer中写数据。
RingReader
对象先连接到g_ringBuf
,然后切换到g_ringBuf2
。
通过attach
获取的RingReader
对象最好不要是局部对象,下面的代码是有问题的:
poller1->async([&]{
//设置为使用cache
auto reader = g_ringBuf->attach(poller1,true);
reader->setReadCB([](int i){
onReadEvent1(i);
});
reader->([](){
DebugL<<"===> detach cb";
onDetachEvent();
});
});
RingReader
对象的生命周期只与任务对象一样长,这样显然不是原始要表达的意图,应该改成如下,在外部定义RingReader
对象。
std::shared_ptr<RingBuffer<int>::RingReader> reader = nullptr;
poller1->async([&]{
//设置为使用cache
reader = g_ringBuf->attach(poller1,true);
reader->setReadCB([](int i){
onReadEvent1(i);
});
reader->([](){
DebugL<<"===> detach cb";
onDetachEvent();
});
});
回到前面说的需求,在WebRtcPlayer
中切换源,但是它并未提供这样的功能,根据上面的分析,可以加一个changeSrc
的方法来实现该功能,代码如下:
void WebRtcPlayer:: changeSrc(const MediaSource::Ptr &src) {
_play_src = src;
if (!_play_src) {
onShutdown(SockException(Err_shutdown, "rtsp media source was shutdown"));
return ;
}
auto playSrc = _play_src.lock();
if (!playSrc) {
onShutdown(SockException(Err_shutdown, "rtsp media source was shutdown"));
return ;
}
if (canSendRtp()) {
playSrc->pause(false);
_reader = playSrc->getRing()->attach(getPoller(), true);
weak_ptr<WebRtcPlayer> weak_self = static_pointer_cast<WebRtcPlayer>(shared_from_this());
weak_ptr<Session> weak_session = static_pointer_cast<Session>(getSession());
_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); });
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
//TraceL<<"send track type:"<<rtp->type<<" ts:"<<rtp->getStamp()<<" ntp:"<<rtp->ntp_stamp<<" size:"<<rtp->getPayloadSize()<<" i:"<<i;
if (TrackVideo == rtp->type) {
strong_self->onSendRtp(rtp, ++i == pkt->size());
}
});
});
_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
strong_self->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
});
}
}
就是从新的src再attch
下,获取RingReader
对象,再设置相关回调。