相关代码开源在:https://github.com/xuqihong1998/RtmpServer
如果对您有帮助,还请您帮我点个star,谢谢
? RTMP协议是**Real Time Message Protocol(实时信息传输协议)**的缩写,它是由Adobe公司提出的一种应用层的协议,用来解决多媒体数据传输流的多路复用(Multiplexing)和分包(packetizing)的问题。RTMP协议是要靠底层可靠的传输层协议(通常是TCP)来保证信息传输的可靠性的,默认使用端口1935。在基于传输层协议的链接建立完成后,RTMP协议也要客户端和服务器通过“握手”来建立基于传输层链接之上的RTMP Connection链接。RTMP Connection成功后会传输一些控制信息,如CreateStream命令会创建一个Stream链接,用于传输具体的音视频数据和控制这些信息传输的命令信息。
一个 RTMP 连接以握手开始。先进行TCP握手后再进行RTMP握手。RTMP 握手由三个固定长度的块组成,有简单握手和复杂握手两种方式,两种握手方式信息流转的过程是相同的,只是消息中携带的信息不同。以下的讲述和代码实现都基于Rtmp的简单握手来实现。
? 上图是Rtmp建立连接前的握手步骤,不过一般的发送步骤是这样的:
? |client|Server |
? |---C0 + C1---> |
? |<--S0 + S1 + S2--|
? |---- C2 ---->|
#ifndef RTMP_HANDSHAKE_H
#define RTMP_HANDSHAKE_H
#include "../util/HandleBit.h"
#include <muduo/net/Buffer.h>
namespace xqh {
class RtmpHandshake
{
public:
/*
定义了Rtmp握手的等待状态
*/
enum State
{
HANDSHAKE_C0C1,
HANDSHAKE_S0S1S2,
HANDSHAKE_C2,
HANDSHAKE_COMPLETE
};
RtmpHandshake(State state);
virtual ~RtmpHandshake();
int Parse(muduo::net::Buffer& buffer, char* res_buf, uint32_t res_buf_size);
int BuildC0C1(char* buf, uint32_t buf_size);
bool IsCompleted() const
{
return handshake_state_ == HANDSHAKE_COMPLETE;
}
private:
State handshake_state_;
std::shared_ptr<char> packet_rtmp_s1;
};
}
#endif
下面是核心解析的代码,主要是从协议包信息,以及此时的网络握手链接中判断
int RtmpHandshake::Parse(muduo::net::Buffer& buffer, char* res_buf, uint32_t res_buf_size)
{
uint8_t *buf = (uint8_t*)buffer.peek();
uint32_t buf_size = buffer.readableBytes();
uint32_t pos = 0;
uint32_t res_size = 0;
std::random_device rd;
if(handshake_state_ == HANDSHAKE_S0S1S2)
{
if(buf_size < (1 + 1536 + 1536))
{
return res_size;
}
if(buf[0] != RTMP_VERSION)
{
LOG_ERROR << "unsupport rtmp version: " << buf[0] << "\n";
}
pos += 1 + 1536 + 1536;
res_size = 1536;
memcpy(res_buf,buf+1,1536);
handshake_state_ = HANDSHAKE_COMPLETE;
}
else if(handshake_state_ == HANDSHAKE_C0C1)
{
if(buf_size < 1537)
{
return res_size;
}
else
{
if(buf[0] != RTMP_VERSION)
{
return -1;
}
pos += 1537;
res_size = 1+1536+1536;
memset(res_buf,0,1537);
res_buf[0] = RTMP_VERSION;
char* p = res_buf;
p += 9;
char* temp = res_buf;
temp += 1;
for(int i = 0; i < 1528; i++)
{
*p++ = rd();
}
memcpy(p, buf+1,1536);
//缓存s1包
memcpy(packet_rtmp_s1.get(), temp, 1536);
handshake_state_ = HANDSHAKE_C2;
}
}
else if(handshake_state_ == HANDSHAKE_C2)
{
if(buf_size < 1536)
{
return res_size;
}
else if (strncmp(packet_rtmp_s1.get(),buffer.peek(), 1536))
{
pos = 1536;
handshake_state_ = HANDSHAKE_COMPLETE;
LOG_INFO << "handshake complete";
}
else
{
pos = 1536;
handshake_state_ = HANDSHAKE_COMPLETE;
}
}
else
{
return -1;
}
buffer.retrieve(pos);
return res_size;
}
客户端发送握手请求,和服务器完成握手。
客户端发送命令消息中的**“连接”(connect)**到服务器,请求与一个服务应用实例建立连接。
服务器接收到连接命令消息后,发送**确认窗口大小(Window Acknowledgement Size)**到客户端,同时连接到连接命令中提到的应用程序。
服务器发送设置带宽协议(Set Peer Bandwidth)消息到客户端。
客户端处理设置带宽协议控制消息后,发送**确认窗口大小(Window Acknowledgement Size)**到服务器端。
服务器发送用户控制消息中的**“流开始”(StreamBegin)**消息到客户端,通知客户端流成功创建,可用于通信。
服务端发送connect的**“应答消息”(_result)**,通知客户端连接的状态。
客户端发送网络连接命令的**“创建流”(createStream)消息到服务端,以创建消息通信的逻辑通道。音频、视频和元数据的发布通过使用createStream命令创建的流通道**执行。服务端发送createStream的“应答消息”(_result)。
客户端发送网络流命令的**“发布”(publish)**到服务端,将命名流发布到服务器。其它客户端可以使用此流名来播放流,接收发布的音频,视频,以及其他数据消息。
客户端发送命令消息或音视频数据至服务端。
上图表示了一个客户端向服务器推流的过程
? 首先,这块代码展现的是当服务器接收到文件描述符的可读事件时,对buffer里的数据进行解析的流程。主要着重于判断连接握手的成立与否,根据握手的情况对rtmp数据进行解析。
bool RtmpConnection::OnRead(muduo::net::Buffer& buffer)
{
bool ret = true;
if(handshake_->IsCompleted())
{
//这里的话就是握手建立后,对chunk数据进行处理
ret = HandleChunk(buffer);
}
else
{
//这里的话就是对握手逻辑的处理
std::shared_ptr<char> res(new char[4096], std::default_delete<char[]>());
int res_size = handshake_->Parse(buffer, res.get(), 4096);
if(res_size < 0)
{
ret = false;
}
if(res_size > 0)
{
conn_->send(res.get(), res_size);
}
if(handshake_->IsCompleted())
{
if(buffer.readableBytes() > 0)
{
ret = HandleChunk(buffer);
}
if(connection_mode_ == RTMP_PUBLISHER ||
connection_mode_ == RTMP_CLIENT)
{
this->SetChunkSize();
this->Connect();
}
}
}
return ret;
}
? 接着,解析chunk数据包,这里只展现了大致的流程
bool RtmpConnection::HandleChunk(muduo::net::Buffer& buffer)
{
int ret = -1;
do
{
RtmpMessage rtmp_msg;
ret = rtmp_chunk_->Parse(buffer, rtmp_msg);
if(ret >= 0)
{
if(rtmp_msg.IsCompleted())
{
if(!HandleMessage(rtmp_msg))
{
return false;
}
}
if(ret == 0)
{
break;
}
}
else if(ret < 0)
{
return false;
}
} while (buffer.readableBytes() > 0);
return true;
}
? 接着,将chunk序列化为我们自己定义的消息对象之后,根据消息对象的消息类型,对数据进行下一步的处理
bool RtmpConnection::HandleMessage(RtmpMessage& rtmp_msg)
{
bool ret = true;
switch (rtmp_msg.type_id)
{
case RTMP_VIDEO:
ret = HandleVideo(rtmp_msg);
break;
case RTMP_AUDIO:
ret = HandleAudio(rtmp_msg);
break;
case RTMP_INVOKE:
ret = HandleInvoke(rtmp_msg);
break;
case RTMP_NOTIFY:
ret = HandleNotify(rtmp_msg);
break;
case RTMP_FLEX_MESSAGE:
LOG_INFO << "unsupported rtmp flex message.\n";
ret = false;
break;
case RTMP_SET_CHUNK_SIZE:
rtmp_chunk_->SetInChunkSize(ReadUint32BE(rtmp_msg.payload.get()));
break;
case RTMP_BANDWIDTH_SIZE:
break;
case RTMP_FLASH_VIDEO:
LOG_INFO << "unsupported rtmp flash video.\n";
ret = false;
break;
case RTMP_ACK:
break;
case RTMP_ACK_SIZE:
break;
case RTMP_USER_EVENT:
break;
default:
LOG_INFO << "unkown message type: " << rtmp_msg.type_id << "\n";
break;
}
return ret;
}
? 从rtmp包中的payload数据中,取出amf数据,amf数据的第一行就代表要处理的命令信息
bool RtmpConnection::HandleInvoke(RtmpMessage& rtmp_msg)
{
bool ret = true;
amf_decoder_.reset();
int bytes_used = amf_decoder_.decode((const char*)rtmp_msg.payload.get(), rtmp_msg.length, 1);
if(bytes_used < 0)
{
return false;
}
std::string method = amf_decoder_.getString();
if(connection_mode_ == RTMP_PUBLISHER || connection_mode_ == RTMP_CLIENT)
{
bytes_used += amf_decoder_.decode(rtmp_msg.payload.get() + bytes_used, rtmp_msg.length - bytes_used);
if(method == "_result")
{
ret = HandleResult(rtmp_msg);
}
else if(method == "onStatus")
{
ret = HandleOnStatus(rtmp_msg);
}
}
else if(connection_mode_ == RTMP_SERVER)
{
if(rtmp_msg.stream_id == 0)
{
bytes_used += amf_decoder_.decode(rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
if(method == "connect")
{
ret = HandleConnect();
}
else if(method == "createStream")
{
ret = HandleCreateStream();
}
}
else if(rtmp_msg.stream_id == stream_id_)
{
bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 3);
stream_name_ = amf_decoder_.getString();
stream_path_ = "/" + app_ + "/" + stream_name_;
if((int)rtmp_msg.length > bytes_used)
{
bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
}
if(method == "publish")
{
ret = HandlePublish();
}
else if(method == "play")
{
ret = HandlePlay();
}
else if(method == "play2")
{
ret = HandlePlay2();
}
else if(method == "DeleteStream")
{
ret = HandleDeleteStream();
}
else if(method == "releaseStream")
{
}
}
}
return ret;
}
bool RtmpConnection::HandleConnect()
{
if(!amf_decoder_.hasObject("app")) {
return false;
}
AmfObject amfObj = amf_decoder_.getObject("app");
app_ = amfObj.amf_string;
if(app_ == "") {
return false;
}
SendAcknowledgement();
SetPeerBandwidth();
SetChunkSize();
AmfObjects objects;
amf_encoder_.reset();
amf_encoder_.encodeString("_result", 7);
amf_encoder_.encodeNumber(amf_decoder_.getNumber());
objects["fmsVer"] = AmfObject(std::string("FMS/4,5,0,297"));
objects["capabilities"] = AmfObject(255.0);
objects["mode"] = AmfObject(1.0);
amf_encoder_.encodeObjects(objects);
objects.clear();
objects["level"] = AmfObject(std::string("status"));
objects["code"] = AmfObject(std::string("NetConnection.Connect.Success"));
objects["description"] = AmfObject(std::string("Connection succeeded."));
objects["objectEncoding"] = AmfObject(0.0);
amf_encoder_.encodeObjects(objects);
SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());
return true;
}
接着,向推流端(client)发送发送确认窗口大小(Window Acknowledgement Size)、设置带宽协议(Set Peer Bandwidth)消息和设置窗口大小(streamBegin),发送完毕则发送connected的应答(_result)信息。
bool RtmpConnection::HandleCreateStream()
{
int stream_id = rtmp_chunk_->GetStreamId();
AmfObjects objects;
amf_encoder_.reset();
amf_encoder_.encodeString("_result", 7);
amf_encoder_.encodeNumber(amf_decoder_.getNumber());
amf_encoder_.encodeObjects(objects);
amf_encoder_.encodeNumber(stream_id);
SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());
stream_id_ = stream_id;
return true;
}
这个就比较简单了,就是从amf中获取当前stream的id,然后再回传确认包
首先先从rtmp包中解析出要推的流的名称
else if(rtmp_msg.stream_id == stream_id_)
{
bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 3);
stream_name_ = amf_decoder_.getString();
stream_path_ = "/" + app_ + "/" + stream_name_;
if((int)rtmp_msg.length > bytes_used)
{
bytes_used += amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
}
if(method == "publish")
{
ret = HandlePublish();
}
......
}
根据上图可以看到 在payload中的amf第四行代表要推的流的名称,此时将其解析出来
bool RtmpConnection::HandlePublish()
{
std::cout << "[Publish] app: "<< app_.c_str()<< " stream name: " << stream_name_.c_str() <<"stream path: " << stream_path_.c_str() << std::endl;
auto server = rtmp_server_.lock();
if (!server) {
std::cout << "UnFound Server!" << std::endl;
return false;
}
AmfObjects objects;
amf_encoder_.reset();
amf_encoder_.encodeString("onStatus", 8);
amf_encoder_.encodeNumber(0);
amf_encoder_.encodeObjects(objects);
bool is_error = false;
if(server->HasPublisher(stream_path_)) {
is_error = true;
objects["level"] = AmfObject(std::string("error"));
objects["code"] = AmfObject(std::string("NetStream.Publish.BadName"));
objects["description"] = AmfObject(std::string("Stream already publishing."));
std::cout << "Stream already publishing." << std::endl;
}
else if(connection_state_ == START_PUBLISH) {
is_error = true;
objects["level"] = AmfObject(std::string("error"));
objects["code"] = AmfObject(std::string("NetStream.Publish.BadConnection"));
objects["description"] = AmfObject(std::string("Connection already publishing."));
std::cout << "Connection already publishing." << std::endl;
}
/* else if(0) {
// 认证处理
} */
else {
objects["level"] = AmfObject(std::string("status"));
objects["code"] = AmfObject(std::string("NetStream.Publish.Start"));
objects["description"] = AmfObject(std::string("Start publising."));
//通过session来管理
server->AddSession(stream_path_);
rtmp_session_ = server->GetSession(stream_path_);
std::cout << "publsh: " << stream_path_ << std::endl;
if (server) {
server->NotifyEvent("publish.start", stream_path_);
}
}
amf_encoder_.encodeObjects(objects);
SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size());
if(is_error) {
// Close ?
}
else {
connection_state_ = START_PUBLISH;
is_publishing_ = true;
}
auto session = rtmp_session_.lock();
if(session) {
session->SetGopCache(max_gop_cache_len_);
session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
}
return true;
}
根据服务器此时的连接状态 做相应的处理
通过RtmpSession这个类,管理stream_path所对应的处理逻辑。每个stream_path和一个session对应,stream_path是唯一的。
接下来,发送一个回传包,表示说开始处理消息,服务器开始推流。
这里的数据消息信息主要包含后续音频视频的编解码信息,服务器和客户端后续可以通过这些元数据对数据进行编解码。
客户端在发送publish消息之后,会给服务器发送setDataFrame和onMetaData消息
bool RtmpConnection::HandleNotify(RtmpMessage& rtmp_msg)
{
amf_decoder_.reset();
int bytes_used = amf_decoder_.decode((const char *)rtmp_msg.payload.get(), rtmp_msg.length, 1);
if(bytes_used < 0)
{
return false;
}
if(amf_decoder_.getString() == "@setDataFrame")
{
amf_decoder_.reset();
bytes_used = amf_decoder_.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 1);
if(bytes_used < 0)
{
return false;
}
if(amf_decoder_.getString() == "onMetaData")
{
amf_decoder_.decode((const char*)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
meta_data_ = amf_decoder_.getObjects();
auto server = rtmp_server_.lock();
if(!server)
{
return false;
}
auto session = rtmp_session_.lock();
if(session)
{
session->SetMetaData(meta_data_);
session->SendMetaData(meta_data_);
}
}
}
return true;
}
在连接正式建立之后,就可以处理音视频信息了
1、每一个message就是一帧数据。对于flv的tag而言,就是对应rtmp每个message,一个tag就是一个message,是一一对应的关系;相当于每一个tag都封装成一个message。
2、RTMP 块流使用Message Type ID=8 作为音频数据,flv的tag header->tag type也用8来表示音频。通常音频流的csid是4(也可以自定义),音频流的每一个chunk的csid都是相同的。
3、RTMP 块流使用Message Type ID=9 作为视频数据,flv的tag header->tag type也用9来表示音频。通常视频流的csid是6(也可以自定义),视频流的每一个chunk的csid都是相同的。
上图表示了一个客户端向服务器拉流的过程
与3.3.1相同
与3.3.3相同
bool RtmpConnection::HandlePlay()
{
//LOG_INFO("[Play] app: %s, stream name: %s, stream path: %s\n", app_.c_str(), stream_name_.c_str(), stream_path_.c_str());
auto server = rtmp_server_.lock();
if (!server) {
return false;
}
AmfObjects objects;
amf_encoder_.reset();
amf_encoder_.encodeString("onStatus", 8);
amf_encoder_.encodeNumber(0);
amf_encoder_.encodeObjects(objects);
objects["level"] = AmfObject(std::string("status"));
objects["code"] = AmfObject(std::string("NetStream.Play.Reset"));
objects["description"] = AmfObject(std::string("Resetting and playing stream."));
amf_encoder_.encodeObjects(objects);
if(!SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size())) {
return false;
}
objects.clear();
amf_encoder_.reset();
amf_encoder_.encodeString("onStatus", 8);
amf_encoder_.encodeNumber(0);
amf_encoder_.encodeObjects(objects);
objects["level"] = AmfObject(std::string("status"));
objects["code"] = AmfObject(std::string("NetStream.Play.Start"));
objects["description"] = AmfObject(std::string("Started playing."));
amf_encoder_.encodeObjects(objects);
if(!SendInvokeMessage(RTMP_CHUNK_INVOKE_ID, amf_encoder_.data(), amf_encoder_.size())) {
return false;
}
amf_encoder_.reset();
amf_encoder_.encodeString("|RtmpSampleAccess", 17);
amf_encoder_.encodeBoolean(true);
amf_encoder_.encodeBoolean(true);
if(!this->SendNotifyMessage(RTMP_CHUNK_DATA_ID, amf_encoder_.data(), amf_encoder_.size())) {
return false;
}
connection_state_ = START_PLAY;
rtmp_session_ = server->GetSession(stream_path_);
auto session = rtmp_session_.lock();
if(session) {
session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
}
if (server) {
server->NotifyEvent("play.start", stream_path_);
}
return true;
}
这个逻辑就是接收到play请求之后,向客户端发送确认包和play.start包
相关参考:
1、https://www.bilibili.com/video/BV1xd4y1W71a/?spm_id_from=333.788&vd_source=e9f028d3dd24949643566e4c76e7009c
2、https://blog.csdn.net/weixin_39399492/article/details/128069969