基于muduo库的RtmpServer

发布时间:2024年01月14日

Rtmp服务器

相关代码开源在:https://github.com/xuqihong1998/RtmpServer
如果对您有帮助,还请您帮我点个star,谢谢

1、Rtmp协议概念

? RTMP协议是**Real Time Message Protocol(实时信息传输协议)**的缩写,它是由Adobe公司提出的一种应用层的协议,用来解决多媒体数据传输流的多路复用(Multiplexing)和分包(packetizing)的问题。RTMP协议是要靠底层可靠的传输层协议(通常是TCP)来保证信息传输的可靠性的,默认使用端口1935。在基于传输层协议的链接建立完成后,RTMP协议也要客户端和服务器通过“握手”来建立基于传输层链接之上的RTMP Connection链接。RTMP Connection成功后会传输一些控制信息,如CreateStream命令会创建一个Stream链接,用于传输具体的音视频数据和控制这些信息传输的命令信息。

2、Rtmp建立连接步骤

2.1 相关概念

一个 RTMP 连接以握手开始。先进行TCP握手后再进行RTMP握手。RTMP 握手由三个固定长度的块组成,有简单握手和复杂握手两种方式,两种握手方式信息流转的过程是相同的,只是消息中携带的信息不同。以下的讲述和代码实现都基于Rtmp的简单握手来实现。

在这里插入图片描述

? 上图是Rtmp建立连接前的握手步骤,不过一般的发送步骤是这样的:

? |client|Server |
? |---C0 + C1---> |
? |<--S0 + S1 + S2--|
? |---- C2 ---->|

2.2 相关代码

2.2.1 rtmphandshake.h
#ifndef RTMP_HANDSHAKE_H
#define RTMP_HANDSHAKE_H

#include "../util/HandleBit.h"
#include <muduo/net/Buffer.h>


namespace xqh {
class RtmpHandshake
{
public:
  	/*
  	定义了Rtmp握手的等待状态
  	*/
    enum State
    {
        HANDSHAKE_C0C1, 
        HANDSHAKE_S0S1S2,
        HANDSHAKE_C2,
        HANDSHAKE_COMPLETE
    };

    RtmpHandshake(State state);
    virtual ~RtmpHandshake();
    int Parse(muduo::net::Buffer& buffer, char* res_buf, uint32_t res_buf_size);
    int BuildC0C1(char* buf, uint32_t buf_size);
    bool IsCompleted() const
    {
        return handshake_state_ == HANDSHAKE_COMPLETE;
    }
private:
    State handshake_state_;
    std::shared_ptr<char> packet_rtmp_s1;
};
}

#endif
2.2.2 rtmphandshake.cpp

下面是核心解析的代码,主要是从协议包信息,以及此时的网络握手链接中判断

int RtmpHandshake::Parse(muduo::net::Buffer& buffer, char* res_buf, uint32_t res_buf_size)
{
    uint8_t *buf = (uint8_t*)buffer.peek();
    uint32_t buf_size = buffer.readableBytes();
    uint32_t pos = 0;
    uint32_t res_size = 0;
    std::random_device rd;

    if(handshake_state_ == HANDSHAKE_S0S1S2)
    {
        if(buf_size < (1 + 1536 + 1536))
        {
            return res_size;
        }

        if(buf[0] != RTMP_VERSION)
        {
            LOG_ERROR << "unsupport rtmp version: " << buf[0] << "\n";
        }

        pos += 1 + 1536 + 1536;
        res_size = 1536;
        memcpy(res_buf,buf+1,1536);
        handshake_state_ = HANDSHAKE_COMPLETE;
    }
    else if(handshake_state_ == HANDSHAKE_C0C1)
    {
        if(buf_size < 1537)
        {
            return res_size;
        }
        else
        {
            if(buf[0] != RTMP_VERSION)
            {
                return -1;
            }

            pos += 1537;
            res_size = 1+1536+1536;
            memset(res_buf,0,1537);
            res_buf[0] = RTMP_VERSION;

            char* p = res_buf;
            p += 9;
            char* temp = res_buf;
            temp += 1;
            for(int i = 0; i < 1528; i++)
            {
                *p++ = rd();
            }
            memcpy(p, buf+1,1536);
            //缓存s1包
            memcpy(packet_rtmp_s1.get(), temp, 1536);
            handshake_state_ = HANDSHAKE_C2;
        }
    }
    else if(handshake_state_ == HANDSHAKE_C2)
    {
        if(buf_size < 1536)
        {
            return res_size;
        }
        else if (strncmp(packet_rtmp_s1.get(),buffer.peek(), 1536))
        {
            pos = 1536;
            handshake_state_ = HANDSHAKE_COMPLETE;
            LOG_INFO << "handshake complete";
        }

        else
        {
            pos = 1536;
            handshake_state_ = HANDSHAKE_COMPLETE;
        }
    }
    else
    {
        return -1;
    }
    buffer.retrieve(pos);
    return res_size;
}

3、Rtmp推流步骤

3.1 推流过程

  • 客户端发送握手请求,和服务器完成握手。

  • 客户端发送命令消息中的**“连接”(connect)**到服务器,请求与一个服务应用实例建立连接。

  • 服务器接收到连接命令消息后,发送**确认窗口大小(Window Acknowledgement Size)**到客户端,同时连接到连接命令中提到的应用程序。

  • 服务器发送设置带宽协议(Set Peer Bandwidth)消息到客户端。

  • 客户端处理设置带宽协议控制消息后,发送**确认窗口大小(Window Acknowledgement Size)**到服务器端。

  • 服务器发送用户控制消息中的**“流开始”(StreamBegin)**消息到客户端,通知客户端流成功创建,可用于通信。

  • 服务端发送connect的**“应答消息”(_result)**,通知客户端连接的状态。

  • 客户端发送网络连接命令的**“创建流”(createStream)消息到服务端,以创建消息通信的逻辑通道。音频、视频和元数据的发布通过使用createStream命令创建的流通道**执行。服务端发送createStream的“应答消息”(_result)。

  • 客户端发送网络流命令的**“发布”(publish)**到服务端,将命名流发布到服务器。其它客户端可以使用此流名来播放流,接收发布的音频,视频,以及其他数据消息。

  • 客户端发送命令消息或音视频数据至服务端。

在这里插入图片描述

上图表示了一个客户端向服务器推流的过程

3.2 相关代码

? 首先,这块代码展现的是当服务器接收到文件描述符的可读事件时,对buffer里的数据进行解析的流程。主要着重于判断连接握手的成立与否,根据握手的情况对rtmp数据进行解析。

bool RtmpConnection::OnRead(muduo::net::Buffer& buffer)
{
    bool ret = true;
    if(handshake_->IsCompleted())
    {
      //这里的话就是握手建立后,对chunk数据进行处理
        ret = HandleChunk(buffer);
    }
    else
    {
      //这里的话就是对握手逻辑的处理
        std::shared_ptr<char> res(new char[4096], std::default_delete<char[]>());
        int res_size = handshake_->Parse(buffer, res.get(), 4096);
        if(res_size < 0)
        {
            ret = false;
        }

        if(res_size > 0)
        {
            conn_->send(res.get(), res_size);
        }

        if(handshake_->IsCompleted())
        {
            if(buffer.readableBytes() > 0)
            {
                ret = HandleChunk(buffer);
            }

            if(connection_mode_ == RTMP_PUBLISHER ||
                connection_mode_ == RTMP_CLIENT)
            {
                this->SetChunkSize();
                this->Connect();
            }
        }
    }
    return ret;
}

? 接着,解析chunk数据包,这里只展现了大致的流程

bool RtmpConnection::HandleChunk(muduo::net::Buffer& buffer)
{
    int ret = -1;
    do
    {
        RtmpMessage rtmp_msg;
        ret = rtmp_chunk_->Parse(buffer, rtmp_msg);
        if(ret >= 0)
        {
            if(rtmp_msg.IsCompleted())
            {
                if(!HandleMessage(rtmp_msg))
                {
                    return false;
                }
            }

            if(ret == 0)
            {
                break;
            }
        }
        else if(ret < 0)
        {
            return false;
        }
    } while (buffer.readableBytes() > 0);
    
    return true;
}

? 接着,将chunk序列化为我们自己定义的消息对象之后,根据消息对象的消息类型,对数据进行下一步的处理

bool RtmpConnection::HandleMessage(RtmpMessage& rtmp_msg)
{
    bool ret = true;
    switch (rtmp_msg.type_id)
    {
    case RTMP_VIDEO:
        ret = HandleVideo(rtmp_msg);
        break;
    case RTMP_AUDIO:
        ret = HandleAudio(rtmp_msg);
        break;
    case RTMP_INVOKE:
        ret = HandleInvoke(rtmp_msg);
        break;
    case RTMP_NOTIFY:
        ret = HandleNotify(rtmp_msg);
        break;
    case RTMP_FLEX_MESSAGE:
        LOG_INFO << "unsupported rtmp flex message.\n";
        ret = false;
        break;
    case RTMP_SET_CHUNK_SIZE:
        rtmp_chunk_->SetInChunkSize(ReadUint32BE(rtmp_msg.payload.get()));
        break;
    case RTMP_BANDWIDTH_SIZE:
        break;
    case RTMP_FLASH_VIDEO:
        LOG_INFO << "unsupported rtmp flash video.\n";
        ret = false;
        break;
    case RTMP_ACK:
        break;
    case RTMP_ACK_SIZE:
        break;
    case RTMP_USER_EVENT:
        break;
    default:
        LOG_INFO << "unkown message type: " << rtmp_msg.type_id << "\n";
        break;
    }

    return ret;
}

3.3 详细步骤拆分

3.3.1 处理命令消息

? 从rtmp包中的payload数据中,取出amf数据,amf数据的第一行就代表要处理的命令信息

bool RtmpConnection::HandleInvoke(RtmpMessage& rtmp_msg)
{
    bool ret = true;
    amf_decoder_.reset();
    int bytes_used = amf_decoder_.decode((const char*)rtmp_msg.payload.get(), rtmp_msg.length, 1);
    if(bytes_used < 0)
    {
        return false;
    }
    std::string method = amf_decoder_.getString();
    if(connection_mode_ == RTMP_PUBLISHER || connection_mode_ == RTMP_CLIENT)
    {
        bytes_used += amf_decoder_.decode(rtmp_msg.payload.get() + bytes_used, rtmp_msg.length - bytes_used);
        if(method == "_result")
        {
            ret = HandleResult(rtmp_msg);
        }
        else if(method == "onStatus")
        {
            ret = HandleOnStatus(rtmp_msg);
        }
    }
    else if(connection_mode_ == RTMP_SERVER)
    {
        if(rtmp_msg.stream_id == 0)
        {
            bytes_used += amf_decoder_.decode(rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
            if(method == "connect")
            {
                ret = HandleConnect();
            }
            else if(method == "createStream")
            {
                ret = HandleCreateStream();
            }
        }
        else if(rtmp_msg.stream_id == stream_id_)
        {
            bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 3);
            stream_name_ = amf_decoder_.getString();
            stream_path_ = "/" + app_ + "/" + stream_name_;
            if((int)rtmp_msg.length > bytes_used)
            {
                bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
            }
            if(method == "publish")
            {
                ret = HandlePublish();
            }
            else if(method == "play")
            {
                ret = HandlePlay();
            }
            else if(method == "play2")
            {
                ret = HandlePlay2();
            }
            else if(method == "DeleteStream")
            {
                ret = HandleDeleteStream();
            }
            else if(method == "releaseStream")
            {

            }
        }
    }

    return ret;
}
3.3.2 处理connect消息
bool RtmpConnection::HandleConnect()
{
    if(!amf_decoder_.hasObject("app")) {
        return false;
    }

    AmfObject amfObj = amf_decoder_.getObject("app");
    app_ = amfObj.amf_string;
    if(app_ == "") {
        return false;
    }

    SendAcknowledgement();
    SetPeerBandwidth();   
    SetChunkSize();

    AmfObjects objects;    
    amf_encoder_.reset();
    amf_encoder_.encodeString("_result", 7);
    amf_encoder_.encodeNumber(amf_decoder_.getNumber());

    objects["fmsVer"] = AmfObject(std::string("FMS/4,5,0,297"));
    objects["capabilities"] = AmfObject(255.0);
    objects["mode"] = AmfObject(1.0);
    amf_encoder_.encodeObjects(objects);
    objects.clear();
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetConnection.Connect.Success"));
    objects["description"] = AmfObject(std::string("Connection succeeded."));
    objects["objectEncoding"] = AmfObject(0.0);
    amf_encoder_.encodeObjects(objects);  

    SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());
    return true;
}

接着,向推流端(client)发送发送确认窗口大小(Window Acknowledgement Size)设置带宽协议(Set Peer Bandwidth)消息设置窗口大小(streamBegin),发送完毕则发送connected的应答(_result)信息。

3.3.3 处理createStream信息
bool RtmpConnection::HandleCreateStream()
{ 
	int stream_id = rtmp_chunk_->GetStreamId();

	AmfObjects objects;
	amf_encoder_.reset();
	amf_encoder_.encodeString("_result", 7);
	amf_encoder_.encodeNumber(amf_decoder_.getNumber());
	amf_encoder_.encodeObjects(objects);
	amf_encoder_.encodeNumber(stream_id);

	SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());
	stream_id_ = stream_id;
	return true;
}

这个就比较简单了,就是从amf中获取当前stream的id,然后再回传确认包

3.3.4 处理publish信息

首先先从rtmp包中解析出要推的流的名称

        else if(rtmp_msg.stream_id == stream_id_)
        {
            bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 3);
            stream_name_ = amf_decoder_.getString();
            stream_path_ = "/" + app_ + "/" + stream_name_;
            if((int)rtmp_msg.length > bytes_used)
            {
                bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
            }
            if(method == "publish")
            {
                ret = HandlePublish();
            }
       ......
       }

在这里插入图片描述

根据上图可以看到 在payload中的amf第四行代表要推的流的名称,此时将其解析出来

bool RtmpConnection::HandlePublish()
{
    std::cout << "[Publish] app: "<< app_.c_str()<< " stream name: " << stream_name_.c_str() <<"stream path: " <<  stream_path_.c_str() << std::endl;

	auto server = rtmp_server_.lock();
	if (!server) {
        std::cout << "UnFound Server!" << std::endl;
		return false;
	}

    AmfObjects objects; 
    amf_encoder_.reset();
    amf_encoder_.encodeString("onStatus", 8);
    amf_encoder_.encodeNumber(0);
    amf_encoder_.encodeObjects(objects);

    bool is_error = false;

    if(server->HasPublisher(stream_path_)) {
		is_error = true;
        objects["level"] = AmfObject(std::string("error"));
        objects["code"] = AmfObject(std::string("NetStream.Publish.BadName"));
        objects["description"] = AmfObject(std::string("Stream already publishing."));
        std::cout << "Stream already publishing." << std::endl;
    }
    else if(connection_state_ == START_PUBLISH) {
		is_error = true;
        objects["level"] = AmfObject(std::string("error"));
        objects["code"] = AmfObject(std::string("NetStream.Publish.BadConnection"));
        objects["description"] = AmfObject(std::string("Connection already publishing."));
        std::cout << "Connection already publishing." << std::endl;
    }
    /* else if(0)  {
        // 认证处理 
    } */
    else {
        objects["level"] = AmfObject(std::string("status"));
        objects["code"] = AmfObject(std::string("NetStream.Publish.Start"));
        objects["description"] = AmfObject(std::string("Start publising."));
      
      //通过session来管理
		server->AddSession(stream_path_);
		rtmp_session_ = server->GetSession(stream_path_);
        std::cout << "publsh: " << stream_path_ << std::endl;

		if (server) {
			server->NotifyEvent("publish.start", stream_path_);
		}
    }

    amf_encoder_.encodeObjects(objects);     
    SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());

    if(is_error) {
        // Close ?
    }
    else {
        connection_state_ = START_PUBLISH;
		is_publishing_ = true;
    }

    auto session = rtmp_session_.lock();
    if(session) {
		session->SetGopCache(max_gop_cache_len_);
		session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
    }        

    return true;
}

根据服务器此时的连接状态 做相应的处理

通过RtmpSession这个类,管理stream_path所对应的处理逻辑。每个stream_path和一个session对应,stream_path是唯一的。

接下来,发送一个回传包,表示说开始处理消息,服务器开始推流。

3.3.5 处理数据消息信息

这里的数据消息信息主要包含后续音频视频的编解码信息,服务器和客户端后续可以通过这些元数据对数据进行编解码。

客户端在发送publish消息之后,会给服务器发送setDataFrame和onMetaData消息

bool RtmpConnection::HandleNotify(RtmpMessage& rtmp_msg)
{
    amf_decoder_.reset();
    int bytes_used = amf_decoder_.decode((const char *)rtmp_msg.payload.get(), rtmp_msg.length, 1);
    if(bytes_used < 0)
    {
        return false;
    }

    if(amf_decoder_.getString() == "@setDataFrame")
    {
        amf_decoder_.reset();
        bytes_used = amf_decoder_.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 1);
        if(bytes_used < 0)
        {
            return false;
        }
        if(amf_decoder_.getString() == "onMetaData")
        {
            amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
            meta_data_ = amf_decoder_.getObjects();
            auto server = rtmp_server_.lock();
            if(!server)
            {
                return false;
            }
            auto session = rtmp_session_.lock();
            if(session)
            {
                session->SetMetaData(meta_data_);
                session->SendMetaData(meta_data_);
            }
        }
    }
    return true;
}
3.3.6 处理音视频信息

在连接正式建立之后,就可以处理音视频信息了

1、每一个message就是一帧数据。对于flv的tag而言,就是对应rtmp每个message,一个tag就是一个message,是一一对应的关系;相当于每一个tag都封装成一个message。

2、RTMP 块流使用Message Type ID=8 作为音频数据,flv的tag header->tag type也用8来表示音频。通常音频流的csid是4(也可以自定义),音频流的每一个chunk的csid都是相同的。

在这里插入图片描述

3、RTMP 块流使用Message Type ID=9 作为视频数据,flv的tag header->tag type也用9来表示音频。通常视频流的csid是6(也可以自定义),视频流的每一个chunk的csid都是相同的。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4、Rtmp拉流步骤

4.1拉流过程
  • 客户端发送握手请求,和服务器完成握手。
  • 客户端发送命令消息中的**“连接”(connect)**到服务器,请求与一个服务应用实例建立连接。
  • 客户端发送网络连接命令的**“创建流”(createStream)**消息到服务端,以创建消息通信的逻辑通道。音频、视频和元数据的发布通过使用createStream命令创建的流通道执行。服务端发送createStream的“应答消息”(_result)。
  • 客户端发送网络流命令中的“**播放”(play)**到服务端。
  • 服务端发送协议控制消息中的**“设置块大小”(Set Chunk Size)**到客户端设置chunk大小。
  • 服务器发送另一个协议控制消息(用户控制),指定事件“StreamIsRecorded”和该消息中的流ID。消息在前2字节中携带事件类型,在后4字节中携带流ID。
  • 服务端发送用户控制消息中的**“流开始”(StreamBegin)**消息到客户端,通知客户端流成功创建,可用于通信。
  • 如果客户端发送的播放命令成功,则服务器发送onStatus命令消息NetStream.Play.Start和NetStream.Play.Reset。仅当客户端发送的播放命令设置了重置标志时,服务器才会发送NetStream.Play.Reset。如果未找到要播放的流,服务器将发送onStatus消息NetStream.Play.StreamNotFound。
  • 服务端发送音视频数据到客户端。
    在这里插入图片描述

上图表示了一个客户端向服务器拉流的过程

4.2 详细步骤拆分
4.2.1 处理connect消息

与3.3.1相同

4.2.2 处理createstream消息

与3.3.3相同

4.2.3 处理play消息
bool RtmpConnection::HandlePlay()
{
	//LOG_INFO("[Play] app: %s, stream name: %s, stream path: %s\n", app_.c_str(), stream_name_.c_str(), stream_path_.c_str());

	auto server = rtmp_server_.lock();
	if (!server) {
		return false;
	}

    AmfObjects objects; 
    amf_encoder_.reset(); 
    amf_encoder_.encodeString("onStatus", 8);
    amf_encoder_.encodeNumber(0);
    amf_encoder_.encodeObjects(objects);
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetStream.Play.Reset"));
    objects["description"] = AmfObject(std::string("Resetting and playing stream."));
    amf_encoder_.encodeObjects(objects);   
    if(!SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size())) {
        return false;
    }

    objects.clear(); 
    amf_encoder_.reset(); 
    amf_encoder_.encodeString("onStatus", 8);
    amf_encoder_.encodeNumber(0);    
    amf_encoder_.encodeObjects(objects);
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetStream.Play.Start"));
    objects["description"] = AmfObject(std::string("Started playing."));   
    amf_encoder_.encodeObjects(objects);
    if(!SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size())) {
        return false;
    }

    amf_encoder_.reset(); 
    amf_encoder_.encodeString("|RtmpSampleAccess", 17);
    amf_encoder_.encodeBoolean(true);
    amf_encoder_.encodeBoolean(true);
    if(!this->SendNotifyMessage(RTMP_CHUNK_DATA_ID, amf_encoder_.data(), amf_encoder_.size())) {
        return false;
    }
             
    connection_state_ = START_PLAY; 
    
	rtmp_session_ = server->GetSession(stream_path_);
	auto session = rtmp_session_.lock();
    if(session) {
		session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
    }  
    
	if (server) {
		server->NotifyEvent("play.start", stream_path_);
	}

    return true;
}

这个逻辑就是接收到play请求之后,向客户端发送确认包和play.start包

相关参考:
1、https://www.bilibili.com/video/BV1xd4y1W71a/?spm_id_from=333.788&vd_source=e9f028d3dd24949643566e4c76e7009c
2、https://blog.csdn.net/weixin_39399492/article/details/128069969

文章来源:https://blog.csdn.net/qq_32810943/article/details/135577616
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。