修复rtsp推流服务器在udp模式下的bug

优化代码
This commit is contained in:
xiongziliang 2019-03-28 11:52:07 +08:00
parent d208f69730
commit 6045b1b8f8
7 changed files with 84 additions and 73 deletions

@ -1 +1 @@
Subproject commit ea623e89153b7f5e4f693e3f2d4c5e60b79540ed Subproject commit 09639d39eee069de0d93da8acbf87aacca5a04cd

View File

@ -64,12 +64,17 @@ void RtmpPusher::teardown() {
} }
void RtmpPusher::onPublishResult(const SockException &ex) { void RtmpPusher::onPublishResult(const SockException &ex) {
_pPublishTimer.reset(); if(_pPublishTimer){
if(_onPublished){ //播放结果回调
_onPublished(ex); _pPublishTimer.reset();
_onPublished = nullptr; if(_onPublished){
}else if(_onShutdown){ _onPublished(ex);
_onShutdown(ex); }
} else {
//播放成功后异常断开回调
if(_onShutdown){
_onShutdown(ex);
}
} }
if(ex){ if(ex){

View File

@ -137,7 +137,6 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string
return false; return false;
} }
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"));
strongSelf->teardown();
return false; return false;
},getPoller())); },getPoller()));
@ -149,7 +148,6 @@ void RtspPlayer::play(const string &strUrl, const string &strUser, const string
void RtspPlayer::onConnect(const SockException &err){ void RtspPlayer::onConnect(const SockException &err){
if(err.getErrCode()!=Err_success) { if(err.getErrCode()!=Err_success) {
onPlayResult_l(err); onPlayResult_l(err);
teardown();
return; return;
} }
@ -160,7 +158,7 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
input(pBuf->data(),pBuf->size()); input(pBuf->data(),pBuf->size());
} }
void RtspPlayer::onErr(const SockException &ex) { void RtspPlayer::onErr(const SockException &ex) {
onShutdown_l (ex); onPlayResult_l(ex);
} }
// from live555 // from live555
bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) { bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
@ -236,18 +234,19 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
sendSetup(0); sendSetup(0);
} }
//发送SETUP命令 //发送SETUP命令
bool RtspPlayer::sendSetup(unsigned int trackIndex) { void RtspPlayer::sendSetup(unsigned int trackIndex) {
_onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex); _onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex);
auto &track = _aTrackInfo[trackIndex]; auto &track = _aTrackInfo[trackIndex];
auto baseUrl = _strContentBase + "/" + track->_control_surffix; auto baseUrl = _strContentBase + "/" + track->_control_surffix;
switch (_eType) { switch (_eType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1});
} }
break;
case Rtsp::RTP_MULTICAST: { case Rtsp::RTP_MULTICAST: {
return sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"}); sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"});
} }
break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket()); _apUdpSock[trackIndex].reset(new Socket());
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
@ -255,10 +254,11 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) {
throw std::runtime_error("open udp sock err"); throw std::runtime_error("open udp sock err");
} }
int port = _apUdpSock[trackIndex]->get_local_port(); int port = _apUdpSock[trackIndex]->get_local_port();
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1});
} }
break;
default: default:
return false; break;
} }
} }
@ -349,28 +349,29 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
if (!strongSelf){ if (!strongSelf){
return false; return false;
} }
return strongSelf->sendOptions(); strongSelf->sendOptions();
return true;
},getPoller())); },getPoller()));
} }
pause(false); pause(false);
} }
bool RtspPlayer::sendOptions() { void RtspPlayer::sendOptions() {
_onHandshake = [](const Parser& parser){ _onHandshake = [](const Parser& parser){
// DebugL << "options response"; // DebugL << "options response";
}; };
return sendRtspRequest("OPTIONS",_strContentBase); sendRtspRequest("OPTIONS",_strContentBase);
} }
bool RtspPlayer::sendDescribe() { void RtspPlayer::sendDescribe() {
//发送DESCRIBE命令后处理函数:handleResDESCRIBE //发送DESCRIBE命令后处理函数:handleResDESCRIBE
_onHandshake = std::bind(&RtspPlayer::handleResDESCRIBE,this, placeholders::_1); _onHandshake = std::bind(&RtspPlayer::handleResDESCRIBE,this, placeholders::_1);
return sendRtspRequest("DESCRIBE",_strUrl,{"Accept","application/sdp"}); sendRtspRequest("DESCRIBE",_strUrl,{"Accept","application/sdp"});
} }
bool RtspPlayer::sendPause(bool bPause,uint32_t seekMS){ void RtspPlayer::sendPause(bool bPause,uint32_t seekMS){
if(!bPause){ if(!bPause){
//修改时间轴 //修改时间轴
int iTimeInc = seekMS - getProgressMilliSecond(); int iTimeInc = seekMS - getProgressMilliSecond();
@ -383,9 +384,9 @@ bool RtspPlayer::sendPause(bool bPause,uint32_t seekMS){
//开启或暂停rtsp //开启或暂停rtsp
_onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause); _onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause);
return sendRtspRequest(bPause ? "PAUSE" : "PLAY", sendRtspRequest(bPause ? "PAUSE" : "PLAY",
_strContentBase, _strContentBase,
{"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"}); {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"});
} }
void RtspPlayer::pause(bool bPause) { void RtspPlayer::pause(bool bPause) {
sendPause(bPause, getProgressMilliSecond()); sendPause(bPause, getProgressMilliSecond());
@ -440,8 +441,6 @@ void RtspPlayer::onWholeRtspPacket(Parser &parser) {
} catch (std::exception &err) { } catch (std::exception &err) {
SockException ex(Err_other, err.what()); SockException ex(Err_other, err.what());
onPlayResult_l(ex); onPlayResult_l(ex);
onShutdown_l(ex);
teardown();
} }
} }
@ -510,7 +509,7 @@ void RtspPlayer::seekToMilliSecond(uint32_t ms) {
sendPause(false,ms); sendPause(false,ms);
} }
bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) { void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) {
string key; string key;
StrCaseMap header_map; StrCaseMap header_map;
int i = 0; int i = 0;
@ -521,9 +520,9 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std
key = val; key = val;
} }
} }
return sendRtspRequest(cmd,url,header_map); sendRtspRequest(cmd,url,header_map);
} }
bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
auto header = header_const; auto header = header_const;
header.emplace("CSeq",StrPrinter << _uiCseq++); header.emplace("CSeq",StrPrinter << _uiCseq++);
header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
@ -570,44 +569,45 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
for (auto &pr : header){ for (auto &pr : header){
printer << pr.first << ": " << pr.second << "\r\n"; printer << pr.first << ": " << pr.second << "\r\n";
} }
return send(printer << "\r\n") > 0; send(printer << "\r\n");
} }
void RtspPlayer::onShutdown_l(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what();
_pPlayTimer.reset();
_pRtpTimer.reset();
_pBeatTimer.reset();
onShutdown(ex);
}
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) { void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) {
_rtpTicker.resetTime(); _rtpTicker.resetTime();
onRecvRTP(pRtppt,track); onRecvRTP(pRtppt,track);
} }
void RtspPlayer::onPlayResult_l(const SockException &ex) { void RtspPlayer::onPlayResult_l(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what(); WarnL << ex.getErrCode() << " " << ex.what();
_pPlayTimer.reset(); if(_pPlayTimer){
_pRtpTimer.reset(); //播放结果回调
if (!ex) { _pPlayTimer.reset();
_rtpTicker.resetTime(); onPlayResult(ex);
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this()); if(!ex){
int timeoutMS = (*this)[kMediaTimeoutMS].as<int>(); //播放成功
_pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { _rtpTicker.resetTime();
auto strongSelf=weakSelf.lock(); weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
if(!strongSelf) { int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
return false; _pRtpTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() {
} auto strongSelf=weakSelf.lock();
if(strongSelf->_rtpTicker.elapsedTime()> timeoutMS) { if(!strongSelf) {
//recv rtp timeout! return false;
strongSelf->onShutdown_l(SockException(Err_timeout,"recv rtp timeout")); }
strongSelf->teardown(); if(strongSelf->_rtpTicker.elapsedTime()> timeoutMS) {
return false; //recv rtp timeout!
} strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtp timeout"));
return true; return false;
},getPoller())); }
return true;
},getPoller()));
}
} else {
//播放成功后异常断开回调
onShutdown(ex);
}
if(ex){
teardown();
} }
onPlayResult(ex);
} }
int RtspPlayer::getTrackIndexByControlSuffix(const string &controlSuffix) const{ int RtspPlayer::getTrackIndexByControlSuffix(const string &controlSuffix) const{

View File

@ -85,7 +85,6 @@ protected:
*/ */
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
private: private:
void onShutdown_l(const SockException &ex);
void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track);
void onPlayResult_l(const SockException &ex); void onPlayResult_l(const SockException &ex);
@ -104,13 +103,13 @@ private:
void handleResPAUSE(const Parser &parser, bool bPause); void handleResPAUSE(const Parser &parser, bool bPause);
//发送SETUP命令 //发送SETUP命令
bool sendSetup(unsigned int uiTrackIndex); void sendSetup(unsigned int uiTrackIndex);
bool sendPause(bool bPause,uint32_t ms); void sendPause(bool bPause,uint32_t ms);
bool sendOptions(); void sendOptions();
bool sendDescribe(); void sendDescribe();
bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap());
bool sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header);
private: private:
string _strUrl; string _strUrl;
SdpAttr _sdpAttr; SdpAttr _sdpAttr;

View File

@ -63,12 +63,17 @@ void RtspPusher::publish(const string &strUrl) {
} }
void RtspPusher::onPublishResult(const SockException &ex) { void RtspPusher::onPublishResult(const SockException &ex) {
_pPublishTimer.reset(); if(_pPublishTimer){
if(_onPublished){ //播放结果回调
_onPublished(ex); _pPublishTimer.reset();
_onPublished = nullptr; if(_onPublished){
}else if(_onShutdown){ _onPublished(ex);
_onShutdown(ex); }
} else {
//播放成功后异常断开回调
if(_onShutdown){
_onShutdown(ex);
}
} }
if(ex){ if(ex){

View File

@ -549,6 +549,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
return false; return false;
} }
trackRef->_inited = true; //现在初始化 trackRef->_inited = true; //现在初始化
trackRef->_interleaved = trackRef->_type * 2;
if(_rtpType == Rtsp::RTP_Invalid){ if(_rtpType == Rtsp::RTP_Invalid){
auto strTransport = parser["Transport"]; auto strTransport = parser["Transport"];
@ -566,7 +567,6 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
switch (_rtpType) { switch (_rtpType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
trackRef->_interleaved = trackRef->_type * 2;
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";" << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";"

View File

@ -48,6 +48,8 @@ void rePushDelay(const string &schema,const string &vhost,const string &app, con
void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
//创建推流器并绑定一个MediaSource //创建推流器并绑定一个MediaSource
pusher.reset(new MediaPusher(schema,vhost, app, stream)); pusher.reset(new MediaPusher(schema,vhost, app, stream));
//可以指定rtsp推流方式支持tcp和udp方式默认tcp
// (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP;
//设置推流中断处理逻辑 //设置推流中断处理逻辑
pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) { pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) {
WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();