/* * RtmpProtocol.cpp * * Created on: 2017年2月7日 * Author: xzl */ #include "RtmpProtocol.h" #include "Rtsp/Rtsp.h" #include "Rtmp/utils.h" #include "Util/util.h" #include "Util/onceToken.h" #include "Thread/ThreadPool.h" using namespace ZL::Util; namespace ZL { namespace Rtmp { RtmpProtocol::RtmpProtocol() { m_nextHandle = [this](){ handle_C0C1(); }; } RtmpProtocol::~RtmpProtocol() { clear(); } void RtmpProtocol::clear() { ////////////ChunkSize//////////// m_iChunkLenIn = DEFAULT_CHUNK_LEN; m_iChunkLenOut = DEFAULT_CHUNK_LEN; ////////////Acknowledgement//////////// m_ui32ByteSent = 0; m_ui32LastSent = 0; m_ui32WinSize = 0; ///////////PeerBandwidth/////////// m_ui32Bandwidth = 2500000; m_ui8LimitType = 2; ////////////Chunk//////////// m_mapChunkData.clear(); m_iNowStreamID = 0; m_iNowChunkID = 0; //////////Invoke Request////////// m_iReqID = 0; //////////Rtmp parser////////// m_strRcvBuf.clear(); m_ui32StreamId = STREAM_CONTROL; m_nextHandle = [this]() { handle_C0C1(); }; } void RtmpProtocol::sendAcknowledgement(uint32_t ui32Size) { std::string control; uint32_t stream = htonl(ui32Size); control.append((char *) &stream, 4); sendRequest(MSG_ACK, control); } void RtmpProtocol::sendAcknowledgementSize(uint32_t ui32Size) { uint32_t windowSize = htonl(ui32Size); std::string set_windowSize((char *) &windowSize, 4); sendRequest(MSG_WIN_SIZE, set_windowSize); } void RtmpProtocol::sendPeerBandwidth(uint32_t ui32Size) { uint32_t peerBandwidth = htonl(ui32Size); std::string set_peerBandwidth((char *) &peerBandwidth, 4); set_peerBandwidth.push_back((char) 0x02); sendRequest(MSG_SET_PEER_BW, set_peerBandwidth); } void RtmpProtocol::sendChunkSize(uint32_t ui32Size) { uint32_t len = htonl(ui32Size); std::string set_chunk((char *) &len, 4); sendRequest(MSG_SET_CHUNK, set_chunk); m_iChunkLenOut = ui32Size; } void RtmpProtocol::sendPingRequest(uint32_t ui32TimeStamp) { sendUserControl(CONTROL_PING_REQUEST, ui32TimeStamp); } void RtmpProtocol::sendPingResponse(uint32_t ui32TimeStamp) { sendUserControl(CONTROL_PING_RESPONSE, ui32TimeStamp); } void RtmpProtocol::sendSetBufferLength(uint32_t ui32StreamId, uint32_t ui32Length) { std::string control; ui32StreamId = htonl(ui32StreamId); control.append((char *) &ui32StreamId, 4); ui32Length = htonl(ui32Length); control.append((char *) &ui32Length, 4); sendUserControl(CONTROL_SETBUFFER, control); } void RtmpProtocol::sendUserControl(uint16_t ui16EventType, uint32_t ui32EventData) { std::string control; uint16_t type = htons(ui16EventType); control.append((char *) &type, 2); uint32_t stream = htonl(ui32EventData); control.append((char *) &stream, 4); sendRequest(MSG_USER_CONTROL, control); } void RtmpProtocol::sendUserControl(uint16_t ui16EventType, const string& strEventData) { std::string control; uint16_t type = htons(ui16EventType); control.append((char *) &type, 2); control.append(strEventData); sendRequest(MSG_USER_CONTROL, control); } void RtmpProtocol::sendResponse(int iType, const string& str) { sendRtmp(iType, m_iNowStreamID, str, 0, m_iNowChunkID); } void RtmpProtocol::sendInvoke(const string& strCmd, const AMFValue& val) { AMFEncoder enc; enc << strCmd << ++m_iReqID << val; sendRequest(MSG_CMD, enc.data()); } void RtmpProtocol::sendRequest(int iCmd, const string& str) { sendRtmp(iCmd, m_ui32StreamId, str, 0, CHUNK_SERVER_REQUEST); } void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { if (iChunkId < 2 || iChunkId > 63) { auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; throw std::runtime_error(strErr); } bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; RtmpHeader header; header.flags = (iChunkId & 0x3f) | (0 << 6); header.typeId = ui8Type; set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); set_be24(header.bodySize, strBuf.size()); set_le32(header.streamId, ui32StreamId); std::string strSend; strSend.append((char *) &header, sizeof header); char acExtStamp[4]; if (bExtStamp) { //扩展时间戳 set_be32(acExtStamp, ui32TimeStamp); } size_t pos = 0; while (pos < strBuf.size()) { if (pos) { uint8_t flags = (iChunkId & 0x3f) | (3 << 6); strSend += char(flags); } if (bExtStamp) { //扩展时间戳 strSend.append(acExtStamp, 4); } size_t chunk = min(m_iChunkLenOut, strBuf.size() - pos); strSend.append(strBuf, pos, chunk); pos += chunk; } onSendRawData(strSend.data(),strSend.size()); m_ui32ByteSent += strSend.size(); if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) { m_ui32LastSent = m_ui32ByteSent; sendAcknowledgement(m_ui32ByteSent); } } void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) { m_strRcvBuf.append(pcRawData, iSize); auto cb = m_nextHandle; cb(); } ////for client//// void RtmpProtocol::startClientSession(const function &callBack) { //发送 C0C1 char handshake_head = HANDSHAKE_PLAINTEXT; onSendRawData(&handshake_head, 1); RtmpHandshake c0c1(::time(NULL)); onSendRawData((char *) (&c0c1), sizeof(RtmpHandshake)); m_nextHandle = [this,callBack]() { //等待 S0+S1+S2 handle_S0S1S2(callBack); }; } void RtmpProtocol::handle_S0S1S2(const function &callBack) { if (m_strRcvBuf.size() < 1 + 2 * sizeof(RtmpHandshake)) { //数据不够 return; } if (m_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } //发送 C2 const char *pcC2 = m_strRcvBuf.data() + 1; onSendRawData(pcC2, sizeof(RtmpHandshake)); m_strRcvBuf.erase(0, 1 + 2 * sizeof(RtmpHandshake)); //握手结束 m_nextHandle = [this]() { //握手结束并且开始进入解析命令模式 handle_rtmp(); }; callBack(); } ////for server //// void RtmpProtocol::handle_C0C1() { if (m_strRcvBuf.size() < 1 + sizeof(RtmpHandshake)) { //need more data! return; } if (m_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } char handshake_head = HANDSHAKE_PLAINTEXT; onSendRawData(&handshake_head, 1); //发送S2 RtmpHandshake s2(0); onSendRawData((char *) &s2, sizeof(RtmpHandshake)); //发送S0S1 onSendRawData(m_strRcvBuf.c_str() + 1, sizeof(RtmpHandshake)); m_strRcvBuf.erase(0, 1 + sizeof(RtmpHandshake)); //等待C2 m_nextHandle = [this]() { handle_C2(); }; } void RtmpProtocol::handle_C2() { if (m_strRcvBuf.size() < sizeof(RtmpHandshake)) { //need more data! return; } m_strRcvBuf.erase(0, sizeof(RtmpHandshake)); //握手结束,进入命令模式 if (!m_strRcvBuf.empty()) { handle_rtmp(); } m_nextHandle = [this]() { handle_rtmp(); }; } void RtmpProtocol::handle_rtmp() { while (!m_strRcvBuf.empty()) { uint8_t flags = m_strRcvBuf[0]; int iOffset = 0; static const size_t HEADER_LENGTH[] = { 12, 8, 4, 1 }; size_t iHeaderLen = HEADER_LENGTH[flags >> 6]; m_iNowChunkID = flags & 0x3f; switch (m_iNowChunkID) { case 0: { //0 值表示二字节形式,并且 ID 范围 64 - 319 //(第二个字节 + 64)。 if (m_strRcvBuf.size() < 2) { //need more data return; } m_iNowChunkID = 64 + (uint8_t) (m_strRcvBuf[1]); iOffset = 1; } break; case 1: { //1 值表示三字节形式,并且 ID 范围为 64 - 65599 //((第三个字节) * 256 + 第二个字节 + 64)。 if (m_strRcvBuf.size() < 3) { //need more data return; } m_iNowChunkID = 64 + ((uint8_t) (m_strRcvBuf[2]) << 8) + (uint8_t) (m_strRcvBuf[1]); iOffset = 2; } break; default: //带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。 break; } if (m_strRcvBuf.size() < iHeaderLen + iOffset) { //need more data return; } RtmpHeader &header = *((RtmpHeader *) (m_strRcvBuf.data() + iOffset)); auto &chunkData = m_mapChunkData[m_iNowChunkID]; chunkData.chunkId = m_iNowChunkID; switch (iHeaderLen) { case 12: chunkData.streamId = load_le32(header.streamId); case 8: chunkData.bodySize = load_be24(header.bodySize); chunkData.typeId = header.typeId; case 4: uint32_t ts = load_be24(header.timeStamp); if (ts == 0xFFFFFF) { chunkData.extStamp = true; }else{ chunkData.extStamp = false; chunkData.timeStamp = ts + ((iHeaderLen == 12) ? 0 : chunkData.timeStamp); } } if (chunkData.extStamp) { if (m_strRcvBuf.size() < iHeaderLen + iOffset + 4) { //need more data return; } chunkData.timeStamp = load_be32( m_strRcvBuf.data() + iOffset + iHeaderLen); iOffset += 4; } if (chunkData.bodySize == 0 || chunkData.bodySize < chunkData.strBuf.size()) { throw std::runtime_error("非法的bodySize"); } auto iMore = min(m_iChunkLenIn, chunkData.bodySize - chunkData.strBuf.size()); if (m_strRcvBuf.size() < iHeaderLen + iOffset + iMore) { //need more data return; } chunkData.strBuf.append(m_strRcvBuf, iHeaderLen + iOffset, iMore); m_strRcvBuf.erase(0, iHeaderLen + iOffset + iMore); if (chunkData.strBuf.size() == chunkData.bodySize) { m_iNowStreamID = chunkData.streamId; handle_rtmpChunk(chunkData); chunkData.strBuf.clear(); } } } void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunkData) { switch (chunkData.typeId) { case MSG_ACK: { if (chunkData.strBuf.size() < 4) { throw std::runtime_error("MSG_ACK: Not enough data"); } //auto bytePeerRecv = load_be32(&chunkData.strBuf[0]); //TraceL << "MSG_ACK:" << bytePeerRecv; } break; case MSG_SET_CHUNK: { if (chunkData.strBuf.size() < 4) { throw std::runtime_error("MSG_SET_CHUNK :Not enough data"); } m_iChunkLenIn = load_be32(&chunkData.strBuf[0]); TraceL << "MSG_SET_CHUNK:" << m_iChunkLenIn; } break; case MSG_USER_CONTROL: { //user control message if (chunkData.strBuf.size() < 2) { throw std::runtime_error("MSG_USER_CONTROL: Not enough data."); } uint16_t event_type = load_be16(&chunkData.strBuf[0]); chunkData.strBuf.erase(0, 2); switch (event_type) { case CONTROL_PING_REQUEST: { if (chunkData.strBuf.size() < 4) { throw std::runtime_error("CONTROL_PING_REQUEST: Not enough data."); } uint32_t timeStamp = load_be32(&chunkData.strBuf[0]); //TraceL << "CONTROL_PING_REQUEST:" << timeStamp; sendUserControl(CONTROL_PING_RESPONSE, timeStamp); } break; case CONTROL_PING_RESPONSE: { if (chunkData.strBuf.size() < 4) { throw std::runtime_error("CONTROL_PING_RESPONSE: Not enough data."); } //uint32_t timeStamp = load_be32(&chunkData.strBuf[0]); //TraceL << "CONTROL_PING_RESPONSE:" << timeStamp; } break; case CONTROL_STREAM_BEGIN: { //开始播放 if (chunkData.strBuf.size() < 4) { throw std::runtime_error("CONTROL_STREAM_BEGIN: Not enough data."); } uint32_t stramId = load_be32(&chunkData.strBuf[0]); onStreamBegin(stramId); TraceL << "CONTROL_STREAM_BEGIN:" << stramId; } break; case CONTROL_STREAM_EOF: { //暂停 if (chunkData.strBuf.size() < 4) { throw std::runtime_error("CONTROL_STREAM_EOF: Not enough data."); } uint32_t stramId = load_be32(&chunkData.strBuf[0]); onStreamEof(stramId); TraceL << "CONTROL_STREAM_EOF:" << stramId; } break; case CONTROL_STREAM_DRY: { //停止播放 if (chunkData.strBuf.size() < 4) { throw std::runtime_error("CONTROL_STREAM_DRY: Not enough data."); } uint32_t stramId = load_be32(&chunkData.strBuf[0]); onStreamDry(stramId); TraceL << "CONTROL_STREAM_DRY:" << stramId; } break; default: //WarnL << "unhandled user control:" << event_type; break; } } break; case MSG_WIN_SIZE: { m_ui32WinSize = load_be32(&chunkData.strBuf[0]); TraceL << "MSG_WIN_SIZE:" << m_ui32WinSize; } break; case MSG_SET_PEER_BW: { m_ui32Bandwidth = load_be32(&chunkData.strBuf[0]); m_ui8LimitType = chunkData.strBuf[4]; TraceL << "MSG_SET_PEER_BW:" << m_ui32WinSize; } break; case MSG_AGGREGATE: throw std::runtime_error("streaming FLV not supported"); break; default: onRtmpChunk(chunkData); break; } } } /* namespace Rtmp */ } /* namespace ZL */