diff --git a/ZLToolKit b/ZLToolKit index ea623e89..09639d39 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit ea623e89153b7f5e4f693e3f2d4c5e60b79540ed +Subproject commit 09639d39eee069de0d93da8acbf87aacca5a04cd diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 5ec122de..44c59290 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -64,12 +64,17 @@ void RtmpPusher::teardown() { } void RtmpPusher::onPublishResult(const SockException &ex) { - _pPublishTimer.reset(); - if(_onPublished){ - _onPublished(ex); - _onPublished = nullptr; - }else if(_onShutdown){ - _onShutdown(ex); + if(_pPublishTimer){ + //播放结果回调 + _pPublishTimer.reset(); + if(_onPublished){ + _onPublished(ex); + } + } else { + //播放成功后异常断开回调 + if(_onShutdown){ + _onShutdown(ex); + } } if(ex){ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index c1ea1eec..3399bff1 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -137,7 +137,6 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string return false; } strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); - strongSelf->teardown(); return false; },getPoller())); @@ -149,7 +148,6 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string void RtspPlayer::onConnect(const SockException &err){ if(err.getErrCode()!=Err_success) { onPlayResult_l(err); - teardown(); return; } @@ -160,7 +158,7 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { input(pBuf->data(),pBuf->size()); } void RtspPlayer::onErr(const SockException &ex) { - onShutdown_l (ex); + onPlayResult_l(ex); } // from live555 bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { @@ -236,18 +234,19 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { sendSetup(0); } //发送SETUP命令 -bool RtspPlayer::sendSetup(unsigned int trackIndex) { +void RtspPlayer::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex); - auto &track = _aTrackInfo[trackIndex]; auto baseUrl = _strContentBase + "/" + track->_control_surffix; switch (_eType) { case Rtsp::RTP_TCP: { - return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); + sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); } + break; case Rtsp::RTP_MULTICAST: { - return sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"}); + sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"}); } + break; case Rtsp::RTP_UDP: { _apUdpSock[trackIndex].reset(new Socket()); if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { @@ -255,10 +254,11 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) { throw std::runtime_error("open udp sock err"); } int port = _apUdpSock[trackIndex]->get_local_port(); - return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); + sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } + break; default: - return false; + break; } } @@ -349,28 +349,29 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) if (!strongSelf){ return false; } - return strongSelf->sendOptions(); + strongSelf->sendOptions(); + return true; },getPoller())); } pause(false); } -bool RtspPlayer::sendOptions() { +void RtspPlayer::sendOptions() { _onHandshake = [](const Parser& parser){ // DebugL << "options response"; }; - return sendRtspRequest("OPTIONS",_strContentBase); + sendRtspRequest("OPTIONS",_strContentBase); } -bool RtspPlayer::sendDescribe() { +void RtspPlayer::sendDescribe() { //发送DESCRIBE命令后处理函数:handleResDESCRIBE _onHandshake = std::bind(&RtspPlayer::handleResDESCRIBE,this, placeholders::_1); - return sendRtspRequest("DESCRIBE",_strUrl,{"Accept","application/sdp"}); + sendRtspRequest("DESCRIBE",_strUrl,{"Accept","application/sdp"}); } -bool RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ +void RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ if(!bPause){ //修改时间轴 int iTimeInc = seekMS - getProgressMilliSecond(); @@ -383,9 +384,9 @@ bool RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ //开启或暂停rtsp _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause); - return sendRtspRequest(bPause ? "PAUSE" : "PLAY", - _strContentBase, - {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); + sendRtspRequest(bPause ? "PAUSE" : "PLAY", + _strContentBase, + {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); } void RtspPlayer::pause(bool bPause) { sendPause(bPause, getProgressMilliSecond()); @@ -440,8 +441,6 @@ void RtspPlayer::onWholeRtspPacket(Parser &parser) { } catch (std::exception &err) { SockException ex(Err_other, err.what()); onPlayResult_l(ex); - onShutdown_l(ex); - teardown(); } } @@ -510,7 +509,7 @@ void RtspPlayer::seekToMilliSecond(uint32_t ms) { sendPause(false,ms); } -bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header) { +void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header) { string key; StrCaseMap header_map; int i = 0; @@ -521,9 +520,9 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std key = val; } } - return sendRtspRequest(cmd,url,header_map); + sendRtspRequest(cmd,url,header_map); } -bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { +void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { auto header = header_const; header.emplace("CSeq",StrPrinter << _uiCseq++); header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); @@ -570,44 +569,45 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC for (auto &pr : header){ printer << pr.first << ": " << pr.second << "\r\n"; } - return send(printer << "\r\n") > 0; + send(printer << "\r\n"); } - -void RtspPlayer::onShutdown_l(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); - _pPlayTimer.reset(); - _pRtpTimer.reset(); - _pBeatTimer.reset(); - onShutdown(ex); -} void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) { _rtpTicker.resetTime(); onRecvRTP(pRtppt,track); } void RtspPlayer::onPlayResult_l(const SockException &ex) { WarnL << ex.getErrCode() << " " << ex.what(); - _pPlayTimer.reset(); - _pRtpTimer.reset(); - if (!ex) { - _rtpTicker.resetTime(); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - int timeoutMS = (*this)[kMediaTimeoutMS].as(); - _pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return false; - } - if(strongSelf->_rtpTicker.elapsedTime()> timeoutMS) { - //recv rtp timeout! - strongSelf->onShutdown_l(SockException(Err_timeout,"recv rtp timeout")); - strongSelf->teardown(); - return false; - } - return true; - },getPoller())); + if(_pPlayTimer){ + //播放结果回调 + _pPlayTimer.reset(); + onPlayResult(ex); + if(!ex){ + //播放成功 + _rtpTicker.resetTime(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + int timeoutMS = (*this)[kMediaTimeoutMS].as(); + _pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { + auto strongSelf=weakSelf.lock(); + if(!strongSelf) { + return false; + } + if(strongSelf->_rtpTicker.elapsedTime()> timeoutMS) { + //recv rtp timeout! + strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtp timeout")); + return false; + } + return true; + },getPoller())); + } + } else { + //播放成功后异常断开回调 + onShutdown(ex); + } + + if(ex){ + teardown(); } - onPlayResult(ex); } int RtspPlayer::getTrackIndexByControlSuffix(const string &controlSuffix) const{ diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 50f9aaa7..a89c4cc8 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -85,7 +85,6 @@ protected: */ void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; private: - void onShutdown_l(const SockException &ex); void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); void onPlayResult_l(const SockException &ex); @@ -104,13 +103,13 @@ private: void handleResPAUSE(const Parser &parser, bool bPause); //发送SETUP命令 - bool sendSetup(unsigned int uiTrackIndex); - bool sendPause(bool bPause,uint32_t ms); - bool sendOptions(); - bool sendDescribe(); + void sendSetup(unsigned int uiTrackIndex); + void sendPause(bool bPause,uint32_t ms); + void sendOptions(); + void sendDescribe(); - bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); - bool sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header); + void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); + void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header); private: string _strUrl; SdpAttr _sdpAttr; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 718fe13f..4ee58975 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -63,12 +63,17 @@ void RtspPusher::publish(const string &strUrl) { } void RtspPusher::onPublishResult(const SockException &ex) { - _pPublishTimer.reset(); - if(_onPublished){ - _onPublished(ex); - _onPublished = nullptr; - }else if(_onShutdown){ - _onShutdown(ex); + if(_pPublishTimer){ + //播放结果回调 + _pPublishTimer.reset(); + if(_onPublished){ + _onPublished(ex); + } + } else { + //播放成功后异常断开回调 + if(_onShutdown){ + _onShutdown(ex); + } } if(ex){ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index b3bb742b..0d7ede12 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -549,6 +549,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { return false; } trackRef->_inited = true; //现在初始化 + trackRef->_interleaved = trackRef->_type * 2; if(_rtpType == Rtsp::RTP_Invalid){ auto strTransport = parser["Transport"]; @@ -566,7 +567,6 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { switch (_rtpType) { case Rtsp::RTP_TCP: { - trackRef->_interleaved = trackRef->_type * 2; sendRtspResponse("200 OK", {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";" diff --git a/tests/test_pusher.cpp b/tests/test_pusher.cpp index cb2efea6..48d3227a 100644 --- a/tests/test_pusher.cpp +++ b/tests/test_pusher.cpp @@ -48,6 +48,8 @@ void rePushDelay(const string &schema,const string &vhost,const string &app, con void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { //创建推流器并绑定一个MediaSource pusher.reset(new MediaPusher(schema,vhost, app, stream)); + //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp +// (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();