diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 80be7bfe..d0c49535 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -46,6 +46,7 @@ using namespace ZL::Util; #define CHUNK_SERVER_REQUEST 2 /*服务器像客户端发出请求时的chunkID*/ #define CHUNK_CLIENT_REQUEST_BEFORE 3 /*客户端在createStream前,向服务器发出请求的chunkID*/ #define CHUNK_CLIENT_REQUEST_AFTER 4 /*客户端在createStream后,向服务器发出请求的chunkID*/ +#define CHUNK_MEDIA 6 /*媒体chunkID*/ #define FLV_KEY_FRAME 1 #define FLV_INTER_FRAME 2 @@ -62,7 +63,7 @@ public: } } uint8_t timeStamp[4]; - uint8_t zero[4] = { 0 }; + uint8_t zero[4] = {0}; uint8_t random[RANDOM_LEN]; private: void random_generate(char* bytes, int size) { diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 35e0b2b8..f7d3dfa5 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -115,7 +115,10 @@ void RtmpProtocol::sendUserControl(uint16_t ui16EventType, } void RtmpProtocol::sendResponse(int iType, const string& str) { - sendRtmp(iType, m_iNowStreamID, str, 0, m_iNowChunkID); + if(!m_bDataStarted && (iType == MSG_DATA)){ + m_bDataStarted = true; + } + sendRtmp(iType, m_iNowStreamID, str, 0, m_bDataStarted ? CHUNK_CLIENT_REQUEST_AFTER : CHUNK_CLIENT_REQUEST_BEFORE); } void RtmpProtocol::sendInvoke(const string& strCmd, const AMFValue& val) { @@ -218,11 +221,12 @@ void RtmpProtocol::handle_C0C1() { throw std::runtime_error("only plaintext[0x03] handshake supported"); } char handshake_head = HANDSHAKE_PLAINTEXT; + //发送S0 onSendRawData(&handshake_head, 1); - //发送S2 + //发送S1 RtmpHandshake s2(0); onSendRawData((char *) &s2, sizeof(RtmpHandshake)); - //发送S0S1 + //发送S2 onSendRawData(m_strRcvBuf.c_str() + 1, sizeof(RtmpHandshake)); m_strRcvBuf.erase(0, 1 + sizeof(RtmpHandshake)); //等待C2 diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 23ae5e54..e88eb036 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -61,6 +61,9 @@ protected: protected: int m_iReqID = 0; uint32_t m_ui32StreamId = STREAM_CONTROL; + int m_iNowStreamID = 0; + int m_iNowChunkID = 0; + bool m_bDataStarted = false; private: void handle_S0S1S2(const function &cb); void handle_C0C1(); @@ -80,8 +83,6 @@ private: uint8_t m_ui8LimitType = 2; ////////////Chunk//////////// unordered_map m_mapChunkData; - int m_iNowStreamID = 0; - int m_iNowChunkID = 0; //////////Rtmp parser////////// string m_strRcvBuf; function m_nextHandle; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index b5491f56..0e5d7085 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -65,30 +65,40 @@ void RtmpSession::onRecv(const Socket::Buffer::Ptr &pBuf) { void RtmpSession::onCmd_connect(AMFDecoder &dec) { auto params = dec.load(); + double amfVer = 0; + AMFValue objectEncoding = params["objectEncoding"]; + if(objectEncoding){ + amfVer = objectEncoding.as_number(); + } + ///////////set chunk size//////////////// + sendChunkSize(4096); + ////////////window Acknowledgement size///// + sendAcknowledgementSize(5000000); + ///////////set peerBandwidth//////////////// + sendPeerBandwidth(5000000); + m_strApp = params["app"].as_string(); bool ok = true; //(app == APP_NAME); AMFValue version(AMF_OBJECT); - version.set("fmsVer", "FMS/3,5,3,888"); - version.set("capabilities", 127.0); + version.set("fmsVer", "ZLMediaKit"); + version.set("capabilities", 255.0); version.set("mode", 1.0); AMFValue status(AMF_OBJECT); status.set("level", ok ? "status" : "error"); status.set("code", ok ? "NetConnection.Connect.Success" : "NetConnection.Connect.InvalidApp"); status.set("description", ok ? "Connection succeeded." : "InvalidApp."); - status.set("objectEncoding", (double) (dec.getVersion())); + status.set("objectEncoding", amfVer); + AMFValue data(AMF_ECMA_ARRAY); + data.set("version","0.0.0.1"); + status.set("data", data); sendReply(ok ? "_result" : "_error", version, status); if (!ok) { throw std::runtime_error("Unsupported application: " + m_strApp); } - ////////////window Acknowledgement size///// - sendAcknowledgementSize(2500000); - ///////////set peerBandwidth//////////////// - sendPeerBandwidth(2500000); - ///////////set chunk size//////////////// -#ifndef _DEBUG - sendChunkSize(60000); -#endif + AMFEncoder invoke; + invoke << "onBWDone" << 0.0 << nullptr; + sendResponse(MSG_CMD, invoke.data()); } void RtmpSession::onCmd_createStream(AMFDecoder &dec) { @@ -108,7 +118,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { status.set("level", ok ? "status" : "error"); status.set("code", ok ? "NetStream.Publish.Start" : "NetStream.Publish.BadName"); status.set("description", ok ? "Started publishing stream." : "Already publishing."); - status.set("clientid", "ASAICiss"); + status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { throw std::runtime_error( StrPrinter << "Already publishing:" << m_strApp << "/" << m_strId << endl); @@ -136,34 +146,52 @@ void RtmpSession::onCmd_play(AMFDecoder &dec) { auto src = RtmpMediaSource::find(m_strApp,m_strId,true); bool ok = (src.operator bool()); ok = ok && src->ready(); + + //stream begin + sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA); + // onStatus(NetStream.Play.Reset) AMFValue status(AMF_OBJECT); - status.set("level", ok ? "status" : "error"); status.set("code", ok ? "NetStream.Play.Reset" : "NetStream.Play.StreamNotFound"); - status.set("description", ok ? "Resetting and playing stream." : "No such stream."); - status.set("details", "stream"); - status.set("clientid", "ASAICiss"); + status.set("description", ok ? "Resetting and playing." : "No such stream."); + status.set("details", m_strId); + status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { throw std::runtime_error( StrPrinter << "No such stream:" << m_strApp << " " << m_strId << endl); } -//stream begin - sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA); + // onStatus(NetStream.Play.Start) status.clear(); status.set("level", "status"); status.set("code", "NetStream.Play.Start"); - status.set("description", "Started playing stream."); - status.set("details", "stream"); - status.set("clientid", "ASAICiss"); - sendReply("onStatus", AMFValue(), status); + status.set("description", "Started playing."); + status.set("details", m_strId); + status.set("clientid", "0"); + sendReply("onStatus", nullptr, status); // |RtmpSampleAccess(true, true) AMFEncoder invoke; invoke << "|RtmpSampleAccess" << true << true; sendResponse(MSG_DATA, invoke.data()); + //onStatus(NetStream.Data.Start) + invoke.clear(); + AMFValue obj(AMF_OBJECT); + obj.set("code","NetStream.Data.Start"); + invoke << "onStatus" << obj; + sendResponse(MSG_DATA, invoke.data()); + + //onStatus(NetStream.Play.PublishNotify) + status.clear(); + status.set("level", "status"); + status.set("code", "NetStream.Play.PublishNotify"); + status.set("description", "Now published."); + status.set("details", m_strId); + status.set("clientid", "0"); + sendReply("onStatus", nullptr, status); + // onMetaData invoke.clear(); invoke << "onMetaData" << src->getMetaData(); @@ -264,14 +292,14 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { switch (chunkData.typeId) { case MSG_CMD: case MSG_CMD3: { - AMFDecoder dec(chunkData.strBuf, 0); + AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); onProcessCmd(dec); } break; case MSG_DATA: case MSG_DATA3: { - AMFDecoder dec(chunkData.strBuf, 0); + AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); std::string type = dec.load(); TraceL << "notify:" << type; if (type == "@setDataFrame") { diff --git a/src/Rtmp/amf.cpp b/src/Rtmp/amf.cpp index 3f572ec9..9bef838f 100644 --- a/src/Rtmp/amf.cpp +++ b/src/Rtmp/amf.cpp @@ -99,7 +99,7 @@ AMFValue::AMFValue(const AMFValue &from) : } AMFValue::AMFValue(AMFValue &&from) { - *this = from; + *this = std::forward(from); } AMFValue& AMFValue::operator =(const AMFValue &from) { diff --git a/src/Rtmp/amf.h b/src/Rtmp/amf.h index 43eec835..91a9bd81 100644 --- a/src/Rtmp/amf.h +++ b/src/Rtmp/amf.h @@ -5,7 +5,7 @@ #include #include #include - +#include enum AMFType { AMF_NUMBER, AMF_INTEGER, @@ -102,11 +102,11 @@ public: } } - const AMFValue &operator[](const std::string &s) const { + const AMFValue &operator[](const char *str) const { if (m_type != AMF_OBJECT && m_type != AMF_ECMA_ARRAY) { throw std::runtime_error("AMF not a object"); } - auto i = m_value.object->find(s); + auto i = m_value.object->find(str); if (i == m_value.object->end()) { static AMFValue val(AMF_NULL); return val; @@ -123,7 +123,7 @@ public: } } - bool operator()() const { + operator bool() const{ return m_type != AMF_NULL; } void set(const std::string &s, const AMFValue &val) { @@ -141,7 +141,7 @@ public: } private: - typedef std::unordered_map mapType; + typedef std::map mapType; typedef std::vector arrayType; AMFType m_type; diff --git a/src/Rtsp/RtspToRtmpMediaSource.cpp b/src/Rtsp/RtspToRtmpMediaSource.cpp index 3510fa35..d6da2627 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.cpp +++ b/src/Rtsp/RtspToRtmpMediaSource.cpp @@ -75,7 +75,7 @@ void RtspToRtmpMediaSource::onGetH264(const H264Frame& frame) { m_rtmpPkt.strBuf.append(&frame.data[4], frame.data.size() - 4); m_rtmpPkt.bodySize = m_rtmpPkt.strBuf.size(); - m_rtmpPkt.chunkId = CHUNK_CLIENT_REQUEST_AFTER; + m_rtmpPkt.chunkId = CHUNK_MEDIA; m_rtmpPkt.streamId = STREAM_MEDIA; m_rtmpPkt.timeStamp = frame.timeStamp; m_rtmpPkt.typeId = MSG_VIDEO; @@ -115,7 +115,7 @@ void RtspToRtmpMediaSource::makeVideoConfigPkt() { m_rtmpPkt.strBuf.append(m_pps); m_rtmpPkt.bodySize = m_rtmpPkt.strBuf.size(); - m_rtmpPkt.chunkId = CHUNK_CLIENT_REQUEST_AFTER; + m_rtmpPkt.chunkId = CHUNK_MEDIA; m_rtmpPkt.streamId = STREAM_MEDIA; m_rtmpPkt.timeStamp = 0; m_rtmpPkt.typeId = MSG_VIDEO; @@ -135,7 +135,7 @@ void RtspToRtmpMediaSource::onGetAdts(const AdtsFrame& frame) { m_rtmpPkt.strBuf.append((char *) frame.data + 7, frame.aac_frame_length - 7); m_rtmpPkt.bodySize = m_rtmpPkt.strBuf.size(); - m_rtmpPkt.chunkId = CHUNK_CLIENT_REQUEST_AFTER; + m_rtmpPkt.chunkId = CHUNK_MEDIA; m_rtmpPkt.streamId = STREAM_MEDIA; m_rtmpPkt.timeStamp = frame.timeStamp; m_rtmpPkt.typeId = MSG_AUDIO; @@ -175,7 +175,7 @@ void RtspToRtmpMediaSource::makeAudioConfigPkt() { m_rtmpPkt.strBuf.append(m_pParser->getAudioCfg()); m_rtmpPkt.bodySize = m_rtmpPkt.strBuf.size(); - m_rtmpPkt.chunkId = CHUNK_CLIENT_REQUEST_AFTER; + m_rtmpPkt.chunkId = CHUNK_MEDIA; m_rtmpPkt.streamId = STREAM_MEDIA; m_rtmpPkt.timeStamp = 0; m_rtmpPkt.typeId = MSG_AUDIO; @@ -186,7 +186,7 @@ void RtspToRtmpMediaSource::makeMetaData() { m_pRtmpSrc.reset(new RtmpMediaSource(getApp(),getId())); m_pRtmpSrc->setOnSeek(m_onSeek); m_pRtmpSrc->setOnStamp(m_onStamp); - AMFValue metaData(AMF_ECMA_ARRAY); + AMFValue metaData(AMF_OBJECT); metaData.set("duration", m_pParser->getDuration()); metaData.set("fileSize", 0); if (m_pParser->containVideo()) {