From e6d511cc9e1f3aba97d6157c28b7dcb2eb85ad31 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 19 Nov 2019 15:52:02 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=92=AD=E6=94=BE=E6=88=90?= =?UTF-8?q?=E5=8A=9F=E4=B8=8E=E4=B8=AD=E9=80=94=E6=96=AD=E5=BC=80=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E8=A7=A6=E5=8F=91=E7=B4=8A=E4=B9=B1=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98:#143?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/media-server | 2 +- src/Rtmp/RtmpPlayer.cpp | 108 +++++++++++++++--------------- src/Rtmp/RtmpPlayer.h | 5 +- src/Rtsp/RtspPlayer.cpp | 145 ++++++++++++++++++++++------------------ src/Rtsp/RtspPlayer.h | 7 +- 5 files changed, 140 insertions(+), 127 deletions(-) diff --git a/3rdpart/media-server b/3rdpart/media-server index 40edf624..678678b2 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 40edf6243d9d99676062062efdec203b24a178aa +Subproject commit 678678b2ee7118d21d33a90a303698e0da02790c diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index cb5c4900..5251ca42 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -34,44 +34,35 @@ using namespace mediakit::Client; namespace mediakit { -unordered_map RtmpPlayer::g_mapCmd; RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) { - static onceToken token([]() { - g_mapCmd.emplace("_error",&RtmpPlayer::onCmd_result); - g_mapCmd.emplace("_result",&RtmpPlayer::onCmd_result); - g_mapCmd.emplace("onStatus",&RtmpPlayer::onCmd_onStatus); - g_mapCmd.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData); - }, []() {}); - } RtmpPlayer::~RtmpPlayer() { DebugL << endl; } + void RtmpPlayer::teardown() { if (alive()) { - _strApp.clear(); - _strStream.clear(); - _strTcUrl.clear(); - - { - lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.clear(); - } - { - lock_guard lck(_mtxOnStatusCB); - _dqOnStatusCB.clear(); - } - _pBeatTimer.reset(); - _pPlayTimer.reset(); - _pMediaTimer.reset(); - _iSeekTo = 0; - CLEAR_ARR(_aiFistStamp); - CLEAR_ARR(_aiNowStamp); - reset(); - shutdown(SockException(Err_shutdown,"teardown")); + shutdown(SockException(Err_shutdown,"teardown")); } + _strApp.clear(); + _strStream.clear(); + _strTcUrl.clear(); + _pBeatTimer.reset(); + _pPlayTimer.reset(); + _pMediaTimer.reset(); + _iSeekTo = 0; + RtmpProtocol::reset(); + + CLEAR_ARR(_aiFistStamp); + CLEAR_ARR(_aiNowStamp); + + lock_guard lck(_mtxOnResultCB); + _mapOnResultCB.clear(); + lock_guard lck2(_mtxOnStatusCB); + _dqOnStatusCB.clear(); } + void RtmpPlayer::play(const string &strUrl) { teardown(); string strHost = FindField(strUrl.data(), "://", "/"); @@ -80,7 +71,7 @@ void RtmpPlayer::play(const string &strUrl) { _strTcUrl = string("rtmp://") + strHost + "/" + _strApp; if (!_strApp.size() || !_strStream.size()) { - onPlayResult_l(SockException(Err_other,"rtmp url非法")); + onPlayResult_l(SockException(Err_other,"rtmp url非法"),false); return; } DebugL << strHost << " " << _strApp << " " << _strStream; @@ -104,7 +95,7 @@ void RtmpPlayer::play(const string &strUrl) { if(!strongSelf) { return false; } - strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout")); + strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"),false); return false; },getPoller())); @@ -112,53 +103,52 @@ void RtmpPlayer::play(const string &strUrl) { startConnect(strHost, iPort , playTimeOutSec); } void RtmpPlayer::onErr(const SockException &ex){ - onPlayResult_l(ex); + //定时器_pPlayTimer为空后表明握手结束了 + onPlayResult_l(ex, !_pPlayTimer); } -void RtmpPlayer::onPlayResult_l(const SockException &ex) { +void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { WarnL << ex.getErrCode() << " " << ex.what(); if(!ex){ - //恢复rtmp接收超时定时器 + //播放成功,恢复rtmp接收超时定时器 _mediaTicker.resetTime(); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); int timeoutMS = (*this)[kMediaTimeoutMS].as(); + //创建rtmp数据接收超时检测定时器 _pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) { - //recv media timeout! - strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtmp timeout")); + //接收rtmp媒体数据超时 + strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtmp timeout"),true); return false; } return true; },getPoller())); } - if (_pPlayTimer) { + if (!handshakeCompleted) { //开始播放阶段 _pPlayTimer.reset(); onPlayResult(ex); - }else { - //播放中途阶段 - if (ex) { - //播放成功后异常断开回调 - onShutdown(ex); - }else{ - //恢复播放 - onResume(); - } - } + } else if (ex) { + //播放成功后异常断开回调 + onShutdown(ex); + } else { + //恢复播放 + onResume(); + } if(ex){ teardown(); } } void RtmpPlayer::onConnect(const SockException &err){ - if(err.getErrCode()!=Err_success) { - onPlayResult_l(err); + if(err.getErrCode() != Err_success) { + onPlayResult_l(err, false); return; } weak_ptr weakSelf= dynamic_pointer_cast(shared_from_this()); @@ -175,7 +165,8 @@ void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){ onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); - onPlayResult_l(ex); + //定时器_pPlayTimer为空后表明握手结束了 + onPlayResult_l(ex, !_pPlayTimer); } } @@ -253,7 +244,7 @@ inline void RtmpPlayer::send_pause(bool bPause) { }else{ _bPaused = bPause; if(!bPause){ - onPlayResult_l(SockException(Err_success, "rtmp resum success")); + onPlayResult_l(SockException(Err_success, "resum rtmp success"), true); }else{ //暂停播放 _pMediaTimer.reset(); @@ -327,7 +318,7 @@ void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) { void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) { //TraceL << ui32StreamId; - onPlayResult_l(SockException(Err_other,"rtmp stream dry")); + onPlayResult_l(SockException(Err_other,"rtmp stream dry"), true); } void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) { @@ -343,7 +334,7 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) { onMediaData(packet); }else{ //先触发onPlayResult事件,这个时候解码器才能初始化完毕 - onPlayResult_l(SockException(Err_success,"play rtmp success")); + onPlayResult_l(SockException(Err_success,"play rtmp success"), false); //触发onPlayResult事件后,再把帧数据输入到解码器 onMediaData(packet); } @@ -351,6 +342,15 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) { void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { + typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec); + static unordered_map s_func_map; + static onceToken token([]() { + s_func_map.emplace("_error",&RtmpPlayer::onCmd_result); + s_func_map.emplace("_result",&RtmpPlayer::onCmd_result); + s_func_map.emplace("onStatus",&RtmpPlayer::onCmd_onStatus); + s_func_map.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData); + }, []() {}); + switch (chunkData.typeId) { case MSG_CMD: case MSG_CMD3: @@ -358,8 +358,8 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { case MSG_DATA3: { AMFDecoder dec(chunkData.strBuf, 0); std::string type = dec.load(); - auto it = g_mapCmd.find(type); - if(it != g_mapCmd.end()){ + auto it = s_func_map.find(type); + if(it != s_func_map.end()){ auto fun = it->second; (this->*fun)(dec); }else{ diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index e63ee668..e06e8191 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -61,7 +61,7 @@ protected: void seekToMilliSecond(uint32_t ms); protected: void onMediaData_l(const RtmpPacket::Ptr &chunkData); - void onPlayResult_l(const SockException &ex); + void onPlayResult_l(const SockException &ex, bool handshakeCompleted); //form Tcpclient void onRecv(const Buffer::Ptr &pBuf) override; @@ -104,9 +104,6 @@ private: deque > _dqOnStatusCB; recursive_mutex _mtxOnStatusCB; - typedef void (RtmpPlayer::*rtmpCMDHandle)(AMFDecoder &dec); - static unordered_map g_mapCmd; - //超时功能实现 Ticker _mediaTicker; std::shared_ptr _pMediaTimer; diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index aa7dfce3..7c9c74ad 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -42,11 +42,17 @@ using namespace mediakit::Client; namespace mediakit { +enum PlayType { + type_play = 0, + type_pause, + type_seek +}; + RtspPlayer::RtspPlayer(const EventPoller::Ptr &poller) : TcpClient(poller){ RtpReceiver::setPoolSize(64); } RtspPlayer::~RtspPlayer(void) { - DebugL<onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); + strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"),false); return false; },getPoller())); @@ -158,8 +163,8 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co startConnect(ip, port , playTimeOutSec); } void RtspPlayer::onConnect(const SockException &err){ - if(err.getErrCode()!=Err_success) { - onPlayResult_l(err); + if(err.getErrCode() != Err_success) { + onPlayResult_l(err,false); return; } @@ -170,7 +175,8 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { input(pBuf->data(),pBuf->size()); } void RtspPlayer::onErr(const SockException &ex) { - onPlayResult_l(ex); + //定时器_pPlayTimer为空后表明握手结束了 + onPlayResult_l(ex,!_pPlayTimer); } // from live555 bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { @@ -403,8 +409,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) WarnL << "收到其他地址的rtcp数据:" << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); return; } - strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], - (unsigned char *) buf->data(), buf->size()); + strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_aTrackInfo[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); }); } } @@ -416,14 +421,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) } //所有setup命令发送完毕 //发送play命令 - sendPause(false, 0,false); -} - -void RtspPlayer::sendOptions() { - _onHandshake = [](const Parser& parser){ -// DebugL << "options response"; - }; - sendRtspRequest("OPTIONS",_strContentBase); + sendPause(type_play, 0); } void RtspPlayer::sendDescribe() { @@ -432,46 +430,67 @@ void RtspPlayer::sendDescribe() { sendRtspRequest("DESCRIBE",_strUrl,{"Accept","application/sdp"}); } - -void RtspPlayer::sendPause(bool bPause,uint32_t seekMS,bool range){ +void RtspPlayer::sendPause(int type , uint32_t seekMS){ + _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,type); //开启或暂停rtsp - _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause); - if(!bPause && range){ - sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase, - {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); - } else{ - sendRtspRequest(bPause ? "PAUSE" : "PLAY", _strContentBase); - } - + switch (type){ + case type_pause: + sendRtspRequest("PAUSE", _strContentBase); + break; + case type_play: + sendRtspRequest("PLAY", _strContentBase); + break; + case type_seek: + sendRtspRequest("PLAY", _strContentBase, {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); + break; + default: + WarnL << "unknown type : " << type; + _onHandshake = nullptr; + break; + } } void RtspPlayer::pause(bool bPause) { - sendPause(bPause, getProgressMilliSecond(),false); + sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond()); } -void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { +void RtspPlayer::handleResPAUSE(const Parser& parser,int type) { if (parser.Url() != "200") { - WarnL <<(bPause ? "Pause" : "Play") << " failed:" << parser.Url() << " " << parser.Tail() << endl; + switch (type) { + case type_pause: + WarnL << "Pause failed:" << parser.Url() << " " << parser.Tail() << endl; + break; + case type_play: + WarnL << "Play failed:" << parser.Url() << " " << parser.Tail() << endl; + break; + case type_seek: + WarnL << "Seek failed:" << parser.Url() << " " << parser.Tail() << endl; + break; + } return; } - if (!bPause) { - uint32_t iSeekTo = 0; - //修正时间轴 - auto strRange = parser["Range"]; - if (strRange.size()) { - auto strStart = FindField(strRange.data(), "npt=", "-"); - if (strStart == "now") { - strStart = "0"; - } - iSeekTo = 1000 * atof(strStart.data()); - DebugL << "seekTo(ms):" << iSeekTo ; - } - //设置相对时间戳 - _stamp[0].setRelativeStamp(iSeekTo); - _stamp[1].setRelativeStamp(iSeekTo); - onPlayResult_l(SockException(Err_success, "rtsp play success")); - } else { + + if (type == type_pause) { + //暂停成功! _pRtpTimer.reset(); + return; } + + //play或seek成功 + uint32_t iSeekTo = 0; + //修正时间轴 + auto strRange = parser["Range"]; + if (strRange.size()) { + auto strStart = FindField(strRange.data(), "npt=", "-"); + if (strStart == "now") { + strStart = "0"; + } + iSeekTo = 1000 * atof(strStart.data()); + DebugL << "seekTo(ms):" << iSeekTo; + } + //设置相对时间戳 + _stamp[0].setRelativeStamp(iSeekTo); + _stamp[1].setRelativeStamp(iSeekTo); + onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), type == type_seek); } void RtspPlayer::onWholeRtspPacket(Parser &parser) { @@ -483,8 +502,8 @@ void RtspPlayer::onWholeRtspPacket(Parser &parser) { } parser.Clear(); } catch (std::exception &err) { - SockException ex(Err_other, err.what()); - onPlayResult_l(ex); + //定时器_pPlayTimer为空后表明握手结束了 + onPlayResult_l(SockException(Err_other, err.what()),!_pPlayTimer); } } @@ -674,7 +693,7 @@ uint32_t RtspPlayer::getProgressMilliSecond() const{ return MAX(_stamp[0].getRelativeStamp(),_stamp[1].getRelativeStamp()); } void RtspPlayer::seekToMilliSecond(uint32_t ms) { - sendPause(false,ms, true); + sendPause(type_seek,ms); } void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header) { @@ -764,7 +783,7 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &tra } -void RtspPlayer::onPlayResult_l(const SockException &ex) { +void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { WarnL << ex.getErrCode() << " " << ex.what(); if(!ex){ @@ -772,33 +791,31 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) { _rtpTicker.resetTime(); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); int timeoutMS = (*this)[kMediaTimeoutMS].as(); - _pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { + //创建rtp数据接收超时检测定时器 + _pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } if(strongSelf->_rtpTicker.elapsedTime()> timeoutMS) { - //recv rtp timeout! - strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtp timeout")); + //接收rtp媒体数据包超时 + strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtp timeout"), true); return false; } return true; },getPoller())); } - if (_pPlayTimer) { + if (!handshakeCompleted) { //开始播放阶段 _pPlayTimer.reset(); onPlayResult(ex); - }else { - //播放中途阶段 - if (ex) { - //播放成功后异常断开回调 - onShutdown(ex); - }else{ - //恢复播放 - onResume(); - } + } else if (ex) { + //播放成功后异常断开回调 + onShutdown(ex); + } else { + //恢复播放 + onResume(); } if(ex){ diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index be170953..a633ab91 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -101,7 +101,7 @@ protected: void onErr(const SockException &ex) override; private: void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); - void onPlayResult_l(const SockException &ex); + void onPlayResult_l(const SockException &ex , bool handshakeCompleted); int getTrackIndexByControlSuffix(const string &controlSuffix) const; int getTrackIndexByInterleaved(int interleaved) const; @@ -111,12 +111,11 @@ private: void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); - void handleResPAUSE(const Parser &parser, bool bPause); + void handleResPAUSE(const Parser &parser, int type); //发送SETUP命令 void sendSetup(unsigned int uiTrackIndex); - void sendPause(bool bPause,uint32_t ms, bool range); - void sendOptions(); + void sendPause(int type , uint32_t ms); void sendDescribe(); void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap());