diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 957cbe45..86ae8bfa 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -62,6 +62,22 @@ void RtspPusher::publish(const string &strUrl) { publish(url,user,pwd,eType); } +void RtspPusher::onShutdown(const SockException &ex) { + if(_onShutdown){ + _onShutdown(ex); + } + teardown(); +} +void RtspPusher::onPublishResult(const SockException &ex) { + _pPublishTimer.reset(); + if(_onPublished){ + _onPublished(ex); + } + if(ex){ + teardown(); + } +} + void RtspPusher::publish(const string & strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { DebugL << strUrl << " " << (strUser.size() ? strUser : "null") << " " @@ -102,7 +118,6 @@ void RtspPusher::publish(const string & strUrl, const string &strUser, const str return false; } strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout")); - strongSelf->teardown(); return false; },getPoller())); @@ -131,7 +146,6 @@ void RtspPusher::onRecv(const Buffer::Ptr &pBuf){ SockException ex(Err_other, e.what()); onPublishResult(ex); onShutdown(ex); - teardown(); } } @@ -145,7 +159,7 @@ void RtspPusher::onWholeRtspPacket(Parser &parser) { } -bool RtspPusher::sendAnnounce() { +void RtspPusher::sendAnnounce() { auto src = _pMediaSrc.lock(); if (!src) { throw std::runtime_error("the media source was released"); @@ -159,7 +173,7 @@ bool RtspPusher::sendAnnounce() { } _onHandshake = std::bind(&RtspPusher::handleResAnnounce,this, placeholders::_1); - return sendRtspRequest("ANNOUNCE",_strUrl,{},src->getSdp()); + sendRtspRequest("ANNOUNCE",_strUrl,{},src->getSdp()); } void RtspPusher::handleResAnnounce(const Parser &parser) { @@ -224,15 +238,15 @@ bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { return false; } -bool RtspPusher::sendSetup(unsigned int trackIndex) { +void RtspPusher::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex); - auto &track = _aTrackInfo[trackIndex]; auto baseUrl = _strContentBase + "/" + track->_control_surffix; switch (_eType) { case Rtsp::RTP_TCP: { - return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); + sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); } + break; case Rtsp::RTP_UDP: { _apUdpSock[trackIndex].reset(new Socket()); if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { @@ -240,10 +254,11 @@ bool RtspPusher::sendSetup(unsigned int trackIndex) { throw std::runtime_error("open udp sock err"); } int port = _apUdpSock[trackIndex]->get_local_port(); - return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); + sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } + break; default: - return false; + break; } } @@ -293,9 +308,9 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) sendRecord(); } -bool RtspPusher::sendOptions() { +void RtspPusher::sendOptions() { _onHandshake = [this](const Parser& parser){}; - return sendRtspRequest("OPTIONS",_strContentBase); + sendRtspRequest("OPTIONS",_strContentBase); } inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) { @@ -332,7 +347,7 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { } -bool RtspPusher::sendRecord() { +void RtspPusher::sendRecord() { _onHandshake = [this](const Parser& parser){ auto src = _pMediaSrc.lock(); if (!src) { @@ -352,7 +367,6 @@ bool RtspPusher::sendRecord() { auto strongSelf = weakSelf.lock(); if(strongSelf){ strongSelf->onShutdown(SockException(Err_other,"媒体源被释放")); - strongSelf->teardown(); } }); if(_eType != Rtsp::RTP_TCP){ @@ -363,7 +377,8 @@ bool RtspPusher::sendRecord() { if (!strongSelf){ return false; } - return strongSelf->sendOptions(); + strongSelf->sendOptions(); + return true; },getPoller())); } onPublishResult(SockException(Err_success,"success")); @@ -371,10 +386,10 @@ bool RtspPusher::sendRecord() { (*this) << SocketFlags(kSockFlags); SockUtil::setNoDelay(_sock->rawFD(),false); }; - return sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); + sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); } -bool RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header,const string &sdp ) { +void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header,const string &sdp ) { string key; StrCaseMap header_map; int i = 0; @@ -385,9 +400,9 @@ bool RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std key = val; } } - return sendRtspRequest(cmd,url,header_map,sdp); + sendRtspRequest(cmd,url,header_map,sdp); } -bool RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const,const string &sdp ) { +void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const,const string &sdp ) { auto header = header_const; header.emplace("CSeq",StrPrinter << _uiCseq++); header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); @@ -445,7 +460,7 @@ bool RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC if(!sdp.empty()){ printer << sdp; } - return send(printer) > 0; + send(printer); } diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index f8571550..0bab5603 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -50,25 +50,13 @@ protected: void onRtpPacket(const char *data,uint64_t len) override {}; private: void publish(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ); + void onShutdown(const SockException &ex); + void onPublishResult(const SockException &ex); - void onShutdown(const SockException &ex) { - _pPublishTimer.reset(); - if(_onShutdown){ - _onShutdown(ex); - } - _pRtspReader.reset(); - } - void onPublishResult(const SockException &ex) { - _pPublishTimer.reset(); - if(_onPublished){ - _onPublished(ex); - } - } - - bool sendAnnounce(); - bool sendSetup(unsigned int uiTrackIndex); - bool sendRecord(); - bool sendOptions(); + void sendAnnounce(); + void sendSetup(unsigned int uiTrackIndex); + void sendRecord(); + void sendOptions(); void handleResAnnounce(const Parser &parser); void handleResSetup(const Parser &parser, unsigned int uiTrackIndex); @@ -77,8 +65,8 @@ private: inline int getTrackIndexByTrackType(TrackType type); void sendRtpPacket(const RtpPacket::Ptr & pkt) ; - bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); - bool sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); + void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); + void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); private: //rtsp鉴权相关 string _rtspMd5Nonce;