完善错误提示

This commit is contained in:
xiongziliang 2019-05-29 18:08:50 +08:00
parent f8f3c5dd19
commit a39c4c1344
22 changed files with 112 additions and 120 deletions

@ -1 +1 @@
Subproject commit 1eeb329f1ebbd7439b1147141de7cec3b5e881e8 Subproject commit bdc1fb594b96c0ec7b56cf2afffa1ff02341cdbb

View File

@ -237,5 +237,11 @@ void MediaInfo::parse(const string &url){
} }
} }
void MediaSourceEvent::onNoneReader(MediaSource &sender){
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId();
//没有任何读取器消费该源,表明该源可以关闭了
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender);
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -54,20 +54,17 @@ public:
MediaSourceEvent(){}; MediaSourceEvent(){};
virtual ~MediaSourceEvent(){}; virtual ~MediaSourceEvent(){};
public: public:
virtual bool seekTo(uint32_t ui32Stamp){ virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){
//拖动进度条 //拖动进度条
return false; return false;
} }
virtual bool close(bool force) { virtual bool close(MediaSource &sender,bool force) {
//通知其停止推流 //通知其停止推流
return false; return false;
} }
virtual void onNoneReader(MediaSource &sender){ virtual void onNoneReader(MediaSource &sender);
//没有任何读取器消费该源,表明该源可以关闭了
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender);
}
}; };
class MediaInfo{ class MediaInfo{
@ -149,7 +146,7 @@ public:
if(!listener){ if(!listener){
return false; return false;
} }
return listener->seekTo(ui32Stamp); return listener->seekTo(*this,ui32Stamp);
} }
virtual uint32_t getTimeStamp(TrackType trackType) = 0; virtual uint32_t getTimeStamp(TrackType trackType) = 0;
@ -159,7 +156,7 @@ public:
if(!listener){ if(!listener){
return false; return false;
} }
return listener->close(force); return listener->close(*this,force);
} }
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){ virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener; _listener = listener;

View File

@ -130,7 +130,7 @@ void HttpClient::onConnect(const SockException &ex) {
printer << pr.second + "\r\n"; printer << pr.second + "\r\n";
} }
send(printer << "\r\n"); send(printer << "\r\n");
onSend(); onFlush();
} }
void HttpClient::onRecv(const Buffer::Ptr &pBuf) { void HttpClient::onRecv(const Buffer::Ptr &pBuf) {
@ -222,12 +222,11 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) {
onResponseCompleted_l(); onResponseCompleted_l();
if(biggerThanExpected) { if(biggerThanExpected) {
//声明的content数据比真实的小那么我们只截取前面部分的并断开链接 //声明的content数据比真实的小那么我们只截取前面部分的并断开链接
shutdown(); shutdown(SockException(Err_shutdown, "http response content size bigger than expected"));
onDisconnect(SockException(Err_other, "http response content size bigger than expected"));
} }
} }
void HttpClient::onSend() { void HttpClient::onFlush() {
_aliveTicker.resetTime(); _aliveTicker.resetTime();
while (_body && _body->remainSize() && !isSocketBusy()) { while (_body && _body->remainSize() && !isSocketBusy()) {
auto buffer = _body->readData(); auto buffer = _body->readData();
@ -252,8 +251,7 @@ void HttpClient::onManager() {
if (_fTimeOutSec > 0 && _aliveTicker.elapsedTime() > _fTimeOutSec * 1000) { if (_fTimeOutSec > 0 && _aliveTicker.elapsedTime() > _fTimeOutSec * 1000) {
//超时 //超时
onDisconnect(SockException(Err_timeout, "http request timeout")); shutdown(SockException(Err_timeout, "http request timeout"));
shutdown();
} }
} }

View File

@ -299,7 +299,7 @@ protected:
virtual void onConnect(const SockException &ex) override; virtual void onConnect(const SockException &ex) override;
virtual void onRecv(const Buffer::Ptr &pBuf) override; virtual void onRecv(const Buffer::Ptr &pBuf) override;
virtual void onErr(const SockException &ex) override; virtual void onErr(const SockException &ex) override;
virtual void onSend() override; virtual void onFlush() override;
virtual void onManager() override; virtual void onManager() override;
private: private:
void onResponseCompleted_l(); void onResponseCompleted_l();

View File

@ -66,14 +66,7 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo
int64_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) { int64_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) {
if(status != "200" && status != "206"){ if(status != "200" && status != "206"){
//失败 //失败
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "Http Status:" << status));
closeFile();
File::delete_file(_filePath.data());
if(_onResult){
auto errMsg = StrPrinter << "Http Status:" << status << endl;
_onResult(Err_other,errMsg,_filePath);
_onResult = nullptr;
}
} }
//后续全部是content //后续全部是content
return -1; return -1;

View File

@ -101,6 +101,7 @@ get_mime_type(const char* name) {
HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
TraceP(this);
//设置15秒发送超时时间 //设置15秒发送超时时间
pSock->setSendTimeOutSecond(15); pSock->setSendTimeOutSecond(15);
//起始接收buffer缓存设置为4K节省内存 //起始接收buffer缓存设置为4K节省内存
@ -108,7 +109,7 @@ HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
} }
HttpSession::~HttpSession() { HttpSession::~HttpSession() {
//DebugL; TraceP(this);
} }
int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
@ -124,17 +125,16 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
string cmd = _parser.Method(); string cmd = _parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) { if (it == g_mapCmdIndex.end()) {
WarnP(this) << cmd;
sendResponse("403 Forbidden", makeHttpHeader(true), ""); sendResponse("403 Forbidden", makeHttpHeader(true), "");
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd));
return 0; return 0;
} }
//默认后面数据不是content而是header //默认后面数据不是content而是header
int64_t content_len = 0; int64_t content_len = 0;
auto &fun = it->second; auto &fun = it->second;
if(!(this->*fun)(content_len)){ if(!(this->*fun)(content_len)){
shutdown(); shutdown(SockException(Err_shutdown,"Connection: close"));
} }
//清空解析器节省内存 //清空解析器节省内存
_parser.Clear(); _parser.Clear();
@ -156,9 +156,13 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
} }
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
// WarnP(this) << err.what(); if(_ticker.createdTime() < 10 * 1000){
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); TraceP(this) << err.what();
}else{
WarnP(this) << err.what();
}
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo, _mediaInfo,
@ -174,8 +178,7 @@ void HttpSession::onManager() {
if(_ticker.elapsedTime() > keepAliveSec * 1000){ if(_ticker.elapsedTime() > keepAliveSec * 1000){
//1分钟超时 //1分钟超时
// WarnP(this) <<"HttpSession timeouted!"; shutdown(SockException(Err_timeout,"session timeouted"));
shutdown();
} }
} }
@ -233,7 +236,7 @@ inline bool HttpSession::checkLiveFlvStream(){
//未找到该流 //未找到该流
sendNotFound(bClose); sendNotFound(bClose);
if(bClose){ if(bClose){
shutdown(); shutdown(SockException(Err_shutdown,"flv stream not found"));
} }
return; return;
} }
@ -242,7 +245,7 @@ inline bool HttpSession::checkLiveFlvStream(){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err);
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return ; return ;
} }
@ -258,7 +261,7 @@ inline bool HttpSession::checkLiveFlvStream(){
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
}catch (std::exception &ex){ }catch (std::exception &ex){
//该rtmp源不存在 //该rtmp源不存在
shutdown(); shutdown(SockException(Err_shutdown,"rtmp mediasource released"));
} }
}; };
@ -446,7 +449,7 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) {
strongSelf->send(sendBuf); strongSelf->send(sendBuf);
} }
if(bClose) { if(bClose) {
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,"read file eof"));
} }
return false; return false;
} }
@ -645,7 +648,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){
} }
strongSelf->responseDelay(Origin,bClose,codeOut,headerOut,contentOut); strongSelf->responseDelay(Origin,bClose,codeOut,headerOut,contentOut);
if(bClose){ if(bClose){
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,"Connection: close"));
} }
}); });
}; };
@ -657,7 +660,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){
invoker("404 Not Found",KeyValue(),""); invoker("404 Not Found",KeyValue(),"");
if(bClose){ if(bClose){
//close类型回复完毕关闭连接 //close类型回复完毕关闭连接
shutdown(); shutdown(SockException(Err_shutdown,"404 Not Found"));
} }
} }
return consumed; return consumed;
@ -727,7 +730,7 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) {
} }
//连接类型是close类型收完content就关闭连接 //连接类型是close类型收完content就关闭连接
shutdown(); shutdown(SockException(Err_shutdown,"recv http content completed"));
//content已经接收完毕 //content已经接收完毕
return false ; return false ;
}; };
@ -763,7 +766,7 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) {
} }
void HttpSession::onDetach() { void HttpSession::onDetach() {
shutdown(); shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
} }
std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){ std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){

View File

@ -83,13 +83,11 @@ protected:
uint64_t len, uint64_t len,
uint64_t totalSize, uint64_t totalSize,
uint64_t recvedSize){ uint64_t recvedSize){
WarnL << "content数据长度过大无法处理,请重载HttpSession::onRecvUnlimitedContent"; shutdown(SockException(Err_shutdown,"http post content is too huge,default closed"));
shutdown();
} }
void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
DebugL << "默认关闭WebSocket"; shutdown(SockException(Err_shutdown,"websocket connection default closed"));
shutdown();
}; };
void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){ void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){

View File

@ -167,15 +167,16 @@ void MediaReader::startReadMP4() {
readSample(sampleMS, false); readSample(sampleMS, false);
_mediaMuxer->setListener(strongSelf); _mediaMuxer->setListener(strongSelf);
} }
bool MediaReader::seekTo(uint32_t ui32Stamp){ bool MediaReader::seekTo(MediaSource &sender,uint32_t ui32Stamp){
seek(ui32Stamp); seek(ui32Stamp);
return true; return true;
} }
bool MediaReader::close(bool force){ bool MediaReader::close(MediaSource &sender,bool force){
if(!force && _mediaMuxer->readerCount() != 0 ){ if(!force && _mediaMuxer->readerCount() != 0 ){
return false; return false;
} }
_timer.reset(); _timer.reset();
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
return true; return true;
} }

View File

@ -62,13 +62,13 @@ public:
* @param ui32Stamp * @param ui32Stamp
* @return * @return
*/ */
bool seekTo(uint32_t ui32Stamp) override; bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override;
/** /**
* MediaReader的流化进程 * MediaReader的流化进程
* @return * @return
*/ */
bool close(bool force) override; bool close(MediaSource &sender,bool force) override;
/** /**
* MediaReader对象然后查找相关的MediaSource对象 * MediaReader对象然后查找相关的MediaSource对象

View File

@ -145,7 +145,7 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
return false; return false;
}, getPoller()); }, getPoller());
} }
bool PlayerProxy::close(bool force) { bool PlayerProxy::close(MediaSource &sender,bool force) {
if(!force && _mediaMuxer->readerCount() != 0){ if(!force && _mediaMuxer->readerCount() != 0){
return false; return false;
} }
@ -162,6 +162,7 @@ bool PlayerProxy::close(bool force) {
} }
} }
}); });
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
return true; return true;
} }

View File

@ -79,7 +79,7 @@ public:
* *
* @return * @return
*/ */
bool close(bool force) override; bool close(MediaSource &sender,bool force) override;
private: private:
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
void rePlay(const string &strUrl,int iFailedCnt); void rePlay(const string &strUrl,int iFailedCnt);

View File

@ -70,7 +70,7 @@ void RtmpPlayer::teardown() {
CLEAR_ARR(_aiFistStamp); CLEAR_ARR(_aiFistStamp);
CLEAR_ARR(_aiNowStamp); CLEAR_ARR(_aiNowStamp);
reset(); reset();
shutdown(); shutdown(SockException(Err_shutdown,"teardown"));
} }
} }
void RtmpPlayer::play(const string &strUrl) { void RtmpPlayer::play(const string &strUrl) {

View File

@ -59,7 +59,7 @@ void RtmpPusher::teardown() {
} }
_pPublishTimer.reset(); _pPublishTimer.reset();
reset(); reset();
shutdown(); shutdown(SockException(Err_shutdown,"teardown"));
} }
} }

View File

@ -65,15 +65,13 @@ void RtmpSession::onError(const SockException& err) {
void RtmpSession::onManager() { void RtmpSession::onManager() {
if (_ticker.createdTime() > 15 * 1000) { if (_ticker.createdTime() > 15 * 1000) {
if (!_pRingReader && !_pPublisherSrc) { if (!_pRingReader && !_pPublisherSrc) {
WarnP(this) << "非法链接"; shutdown(SockException(Err_timeout,"illegal connection"));
shutdown();
} }
} }
if (_pPublisherSrc) { if (_pPublisherSrc) {
//publisher //publisher
if (_ticker.elapsedTime() > 15 * 1000) { if (_ticker.elapsedTime() > 15 * 1000) {
WarnP(this) << "数据接收超时"; shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout"));
shutdown();
} }
} }
} }
@ -84,8 +82,7 @@ void RtmpSession::onRecv(const Buffer::Ptr &pBuf) {
_ui64TotalBytes += pBuf->size(); _ui64TotalBytes += pBuf->size();
onParseRtmp(pBuf->data(), pBuf->size()); onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) { } catch (exception &e) {
WarnP(this) << e.what(); shutdown(SockException(Err_shutdown,StrPrinter << "catch exception:" << e.what()));
shutdown();
} }
} }
@ -159,12 +156,11 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
status.set("clientid", "0"); status.set("clientid", "0");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
if (!ok) { if (!ok) {
WarnP(this) << "onPublish:" string errMsg = StrPrinter << (authSuccess ? "already publishing:" : err.data()) << " "
<< (authSuccess ? "Already publishing:" : err.data()) << " " << _mediaInfo._vhost << " "
<< _mediaInfo._vhost << " " << _mediaInfo._app << " "
<< _mediaInfo._app << " " << _mediaInfo._streamid;
<< _mediaInfo._streamid << endl; shutdown(SockException(Err_shutdown,errMsg));
shutdown();
return; return;
} }
_pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid)); _pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
@ -222,12 +218,11 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
status.set("clientid", "0"); status.set("clientid", "0");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
if (!ok) { if (!ok) {
WarnP(this) << (authSuccess ? "No such stream:" : err.data()) << " " string errMsg = StrPrinter << (authSuccess ? "no such stream:" : err.data()) << " "
<< _mediaInfo._vhost << " " << _mediaInfo._vhost << " "
<< _mediaInfo._app << " " << _mediaInfo._app << " "
<< _mediaInfo._streamid << _mediaInfo._streamid;
<< endl; shutdown(SockException(Err_shutdown,errMsg));
shutdown();
return; return;
} }
@ -286,7 +281,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
}); });
_pPlayerSrc = src; _pPlayerSrc = src;
if (src->readerCount() == 1) { if (src->readerCount() == 1) {

View File

@ -81,12 +81,12 @@ private:
sendResponse(MSG_CMD, invoke.data()); sendResponse(MSG_CMD, invoke.data());
} }
bool close(bool force) override { bool close(MediaSource &sender,bool force) override {
if(!force && _pPublisherSrc->readerCount() != 0){ if(!force && _pPublisherSrc->readerCount() != 0){
return false; return false;
} }
InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
safeShutdown(); safeShutdown(SockException(Err_shutdown,err));
return true; return true;
} }
private: private:

View File

@ -51,7 +51,7 @@ RtspPlayer::~RtspPlayer(void) {
void RtspPlayer::teardown(){ void RtspPlayer::teardown(){
if (alive()) { if (alive()) {
sendRtspRequest("TEARDOWN" ,_strContentBase); sendRtspRequest("TEARDOWN" ,_strContentBase);
shutdown(); shutdown(SockException(Err_shutdown,"teardown"));
} }
_rtspMd5Nonce.clear(); _rtspMd5Nonce.clear();

View File

@ -25,7 +25,7 @@ RtspPusher::~RtspPusher() {
void RtspPusher::teardown() { void RtspPusher::teardown() {
if (alive()) { if (alive()) {
sendRtspRequest("TEARDOWN" ,_strContentBase); sendRtspRequest("TEARDOWN" ,_strContentBase);
shutdown(); shutdown(SockException(Err_shutdown,"teardown"));
} }
reset(); reset();
@ -329,7 +329,7 @@ inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) {
int iTrackIndex = getTrackIndexByTrackType(pkt->type); int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto &pSock = _apUdpSock[iTrackIndex]; auto &pSock = _apUdpSock[iTrackIndex];
if (!pSock) { if (!pSock) {
shutdown(); shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
return; return;
} }
BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); BufferRtp::Ptr buffer(new BufferRtp(pkt,4));

View File

@ -72,11 +72,11 @@ static recursive_mutex g_mtxGetter;
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
//设置15秒发送超时时间 DebugP(this);
pSock->setSendTimeOutSecond(15); //设置15秒发送超时时间
//起始接收buffer缓存设置为4K节省内存 pSock->setSendTimeOutSecond(15);
pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024)); //起始接收buffer缓存设置为4K节省内存
DebugP(this); pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
} }
RtspSession::~RtspSession() { RtspSession::~RtspSession() {
@ -84,7 +84,7 @@ RtspSession::~RtspSession() {
} }
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
TraceP(this) << err.getErrCode() << " " << err.what(); WarnP(this) << err.what();
if (_rtpType == Rtsp::RTP_MULTICAST) { if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
@ -113,8 +113,7 @@ void RtspSession::onError(const SockException& err) {
void RtspSession::onManager() { void RtspSession::onManager() {
if (_ticker.createdTime() > 15 * 1000) { if (_ticker.createdTime() > 15 * 1000) {
if (_strSession.size() == 0) { if (_strSession.size() == 0) {
WarnP(this) << "非法链接"; shutdown(SockException(Err_timeout,"illegal connection"));
shutdown();
return; return;
} }
} }
@ -122,9 +121,8 @@ void RtspSession::onManager() {
if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) {
//如果是推流端或者rtp over udp类型的播放端那么就做超时检测 //如果是推流端或者rtp over udp类型的播放端那么就做超时检测
WarnP(this) << "RTSP会话超时"; shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
shutdown(); return;
return;
} }
} }
@ -169,11 +167,10 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
if (it != s_handler_map.end()) { if (it != s_handler_map.end()) {
auto &fun = it->second; auto &fun = it->second;
if(!(this->*fun)(parser)){ if(!(this->*fun)(parser)){
shutdown(); shutdown(SockException(Err_shutdown,"self close"));
} }
} else{ } else{
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
WarnP(this) << "不支持的rtsp命令:" << strCmd;
} }
} }
@ -252,7 +249,7 @@ bool RtspSession::handleReq_RECORD(const Parser &parser){
if(!authSuccess){ if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权 //第一次play是播放否则是恢复播放。只对播放鉴权
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return; return;
} }
@ -260,8 +257,8 @@ bool RtspSession::handleReq_RECORD(const Parser &parser){
for(auto &track : _aTrackInfo){ for(auto &track : _aTrackInfo){
if (track->_inited == false) { if (track->_inited == false) {
//还有track没有setup //还有track没有setup
shutdown(); shutdown(SockException(Err_shutdown,"track not setuped"));
return; return;
} }
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ","; rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
} }
@ -312,9 +309,9 @@ bool RtspSession::handleReq_Describe(const Parser &parser) {
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src); auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) { if (!rtsp_src) {
//未找到相应的MediaSource //未找到相应的MediaSource
WarnP(strongSelf.get()) << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,err));
return; return;
} }
//找到了响应的rtsp流 //找到了响应的rtsp流
@ -324,7 +321,7 @@ bool RtspSession::handleReq_Describe(const Parser &parser) {
if (strongSelf->_aTrackInfo.empty()) { if (strongSelf->_aTrackInfo.empty()) {
//该流无效 //该流无效
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp"));
return; return;
} }
strongSelf->_strSession = makeRandStr(12); strongSelf->_strSession = makeRandStr(12);
@ -664,7 +661,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->safeShutdown(); strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
}); });
} }
int iSrvPort = _pBrdcaster->getPort(trackRef->_type); int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
@ -707,14 +704,14 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
if(!authSuccess){ if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权 //第一次play是播放否则是恢复播放。只对播放鉴权
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
shutdown(); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return; return;
} }
auto pMediaSrc = _pMediaSrc.lock(); auto pMediaSrc = _pMediaSrc.lock();
if(!pMediaSrc){ if(!pMediaSrc){
send_StreamNotFound(); send_StreamNotFound();
shutdown(); shutdown(SockException(Err_shutdown,"rtsp stream released"));
return; return;
} }
@ -740,8 +737,8 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
for(auto &track : _aTrackInfo){ for(auto &track : _aTrackInfo){
if (track->_inited == false) { if (track->_inited == false) {
//还有track没有setup //还有track没有setup
shutdown(); shutdown(SockException(Err_shutdown,"track not setuped"));
return; return;
} }
track->_ssrc = pMediaSrc->getSsrc(track->_type); track->_ssrc = pMediaSrc->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type); track->_seq = pMediaSrc->getSeqence(track->_type);
@ -773,8 +770,8 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->shutdown(); strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
}); });
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { _pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
@ -867,8 +864,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) {
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
auto httpGetterStrong = httpGetterWeak.lock(); auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){ if(!httpGetterStrong){
WarnP(this) << "Http Getter已经释放"; shutdown(SockException(Err_shutdown,"http getter released"));
shutdown();
return; return;
} }
@ -1085,12 +1081,12 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
return -1; return -1;
} }
bool RtspSession::close(bool force) { bool RtspSession::close(MediaSource &sender,bool force) {
if(!force && _pushSrc->readerCount() != 0){ if(!force && _pushSrc->readerCount() != 0){
return false; return false;
} }
InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
safeShutdown(); safeShutdown(SockException(Err_shutdown,err));
return true; return true;
} }
@ -1107,7 +1103,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
int iTrackIndex = getTrackIndexByTrackType(pkt->type); int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto &pSock = _apRtpSock[iTrackIndex]; auto &pSock = _apRtpSock[iTrackIndex];
if (!pSock) { if (!pSock) {
shutdown(); shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
return; return;
} }
BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); BufferRtp::Ptr buffer(new BufferRtp(pkt,4));

View File

@ -105,7 +105,7 @@ protected:
//RtpReceiver override //RtpReceiver override
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override //MediaSourceEvent override
bool close(bool force) override ; bool close(MediaSource &sender,bool force) override ;
//TcpSession override //TcpSession override
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;

View File

@ -39,26 +39,26 @@ static onceToken s_token([]() {
}, nullptr); }, nullptr);
ShellSession::ShellSession(const Socket::Ptr &_sock) : TcpSession(_sock) { ShellSession::ShellSession(const Socket::Ptr &_sock) : TcpSession(_sock) {
DebugP(this);
pleaseInputUser(); pleaseInputUser();
} }
ShellSession::~ShellSession() { ShellSession::~ShellSession() {
DebugP(this);
} }
void ShellSession::onRecv(const Buffer::Ptr&buf) { void ShellSession::onRecv(const Buffer::Ptr&buf) {
//DebugL << hexdump(buf->data(), buf->size()); //DebugL << hexdump(buf->data(), buf->size());
GET_CONFIG(uint32_t,maxReqSize,Shell::kMaxReqSize); GET_CONFIG(uint32_t,maxReqSize,Shell::kMaxReqSize);
if (_strRecvBuf.size() + buf->size() >= maxReqSize) { if (_strRecvBuf.size() + buf->size() >= maxReqSize) {
WarnL << "接收缓冲区溢出!"; shutdown(SockException(Err_other,"recv buffer overflow"));
shutdown();
return; return;
} }
_beatTicker.resetTime(); _beatTicker.resetTime();
_strRecvBuf.append(buf->data(), buf->size()); _strRecvBuf.append(buf->data(), buf->size());
if (_strRecvBuf.find("\xff\xf4\xff\0xfd\x06") != std::string::npos) { if (_strRecvBuf.find("\xff\xf4\xff\0xfd\x06") != std::string::npos) {
WarnL << "收到Ctrl+C.";
send("\033[0m\r\n Bye bye!\r\n"); send("\033[0m\r\n Bye bye!\r\n");
shutdown(); shutdown(SockException(Err_other,"received Ctrl+C"));
return; return;
} }
size_t index; size_t index;
@ -67,16 +67,20 @@ void ShellSession::onRecv(const Buffer::Ptr&buf) {
line = _strRecvBuf.substr(0, index); line = _strRecvBuf.substr(0, index);
_strRecvBuf.erase(0, index + 2); _strRecvBuf.erase(0, index + 2);
if (!onCommandLine(line)) { if (!onCommandLine(line)) {
shutdown(); shutdown(SockException(Err_other,"exit cmd"));
return; return;
} }
} }
} }
void ShellSession::onError(const SockException &err){
WarnP(this) << err.what();
}
void ShellSession::onManager() { void ShellSession::onManager() {
if (_beatTicker.elapsedTime() > 1000 * 60 * 5) { if (_beatTicker.elapsedTime() > 1000 * 60 * 5) {
//5 miniutes for alive //5 miniutes for alive
shutdown(); shutdown(SockException(Err_timeout,"session timeout"));
return; return;
} }
} }

View File

@ -41,7 +41,7 @@ public:
virtual ~ShellSession(); virtual ~ShellSession();
void onRecv(const Buffer::Ptr &) override; void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override {}; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
private: private: