diff --git a/ZLToolKit b/ZLToolKit index 15d6ebbf..843a3238 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit 15d6ebbf2e09ccde3dc0e467df84401dc57b475d +Subproject commit 843a323834e9b6c164dcd620b196e3742c39f016 diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index f06dfd93..c569d891 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -232,7 +232,7 @@ inline bool HttpSession::checkLiveFlvStream(){ (*this) << SocketFlags(kSockFlags); try{ - start(mediaSrc); + start(getPoller(),mediaSrc); }catch (std::exception &ex){ //该rtmp源不存在 shutdown(); @@ -683,36 +683,21 @@ inline void HttpSession::sendNotFound(bool bClose) { void HttpSession::onWrite(const Buffer::Ptr &buffer) { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - async([weakSelf,buffer](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->_ticker.resetTime(); - strongSelf->_ui64TotalBytes += buffer->size(); - strongSelf->send(buffer); - }); + _ticker.resetTime(); + _ui64TotalBytes += buffer->size(); + send(buffer); } void HttpSession::onWrite(const char *data, int len) { BufferRaw::Ptr buffer(new BufferRaw); buffer->assign(data,len); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - async([weakSelf,buffer](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->_ticker.resetTime(); - strongSelf->_ui64TotalBytes += buffer->size(); - strongSelf->send(buffer); - }); + _ticker.resetTime(); + _ui64TotalBytes += buffer->size(); + send(buffer); } void HttpSession::onDetach() { - safeShutdown(); + shutdown(); } std::shared_ptr HttpSession::getSharedPtr(){ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 74292dcf..3a0d350d 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -203,7 +203,7 @@ inline void RtmpPusher::send_metaData(){ sendRtmp(pkt->typeId, _ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId ); }); - _pRtmpReader = src->getRing()->attach(); + _pRtmpReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ auto strongSelf = weakSelf.lock(); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 84db6650..e99b4853 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -336,7 +336,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr onSendMedia(pkt); }); - _pRingReader = src->getRing()->attach(); + _pRingReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); SockUtil::setNoDelay(_sock->rawFD(), false); _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { @@ -344,20 +344,14 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr if (!strongSelf) { return; } - strongSelf->async([weakSelf, pkt]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - strongSelf->onSendMedia(pkt); - }); + strongSelf->onSendMedia(pkt); }); _pRingReader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; } - strongSelf->safeShutdown(); + strongSelf->shutdown(); }); _pPlayerSrc = src; if (src->getRing()->readerCount() == 1) { @@ -446,13 +440,7 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { if(!strongSelf) { return; } - strongSelf->async([weakSelf,pkt]() { - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->onSendMedia(pkt); - }); + strongSelf->onSendMedia(pkt); }); } } diff --git a/src/RtmpMuxer/FlvMuxer.cpp b/src/RtmpMuxer/FlvMuxer.cpp index 0bc4f473..a1512103 100644 --- a/src/RtmpMuxer/FlvMuxer.cpp +++ b/src/RtmpMuxer/FlvMuxer.cpp @@ -38,7 +38,7 @@ FlvMuxer::FlvMuxer() { FlvMuxer::~FlvMuxer() { } -void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { +void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media) { if(!media){ throw std::runtime_error("RtmpMediaSource 无效"); } @@ -46,7 +46,7 @@ void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { onWriteFlvHeader(media); std::weak_ptr weakSelf = getSharedPtr(); - _ring_reader = media->getRing()->attach(); + _ring_reader = media->getRing()->attach(poller); _ring_reader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ @@ -189,11 +189,11 @@ void FlvMuxer::stop() { } ///////////////////////////////////////////////////////FlvRecorder///////////////////////////////////////////////////// -void FlvRecorder::startRecord(const string &vhost, const string &app, const string &stream,const string &file_path) { - startRecord(dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path); +void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const string &vhost, const string &app, const string &stream,const string &file_path) { + startRecord(poller,dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path); } -void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) { +void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media, const string &file_path) { stop(); lock_guard lck(_file_mtx); //开辟文件写缓存 @@ -215,7 +215,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f //设置文件写缓存 setvbuf( _file.get(), fileBuf.get(),_IOFBF, FILE_BUF_SIZE); - start(media); + start(poller,media); } void FlvRecorder::onWrite(const Buffer::Ptr &data) { diff --git a/src/RtmpMuxer/FlvMuxer.h b/src/RtmpMuxer/FlvMuxer.h index 56c87986..59283676 100644 --- a/src/RtmpMuxer/FlvMuxer.h +++ b/src/RtmpMuxer/FlvMuxer.h @@ -41,7 +41,7 @@ public: virtual ~FlvMuxer(); void stop(); protected: - void start(const RtmpMediaSource::Ptr &media); + void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media); virtual void onWrite(const Buffer::Ptr &data) = 0; virtual void onWrite(const char *data,int len) = 0; virtual void onDetach() = 0; @@ -55,6 +55,7 @@ private: RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; uint32_t _aui32FirstStamp[2] = {0}; uint32_t _previousTagSize = 0; + }; class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this{ @@ -62,8 +63,8 @@ public: typedef std::shared_ptr Ptr; FlvRecorder(); virtual ~FlvRecorder(); - void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path); - void startRecord(const RtmpMediaSource::Ptr &media,const string &file_path); + void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path); + void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path); private: virtual void onWrite(const Buffer::Ptr &data) override ; virtual void onWrite(const char *data,int len) override; diff --git a/src/Rtsp/RtpBroadCaster.cpp b/src/Rtsp/RtpBroadCaster.cpp index ec6077c9..fc5d8003 100644 --- a/src/Rtsp/RtpBroadCaster.cpp +++ b/src/Rtsp/RtpBroadCaster.cpp @@ -124,7 +124,7 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,c bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); _apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr); } - _pReader = src->getRing()->attach(); + _pReader = src->getRing()->attach(nullptr); _pReader->setReadCB([this](const RtpPacket::Ptr &pkt){ int i = (int)(pkt->type); auto &pSock = _apUdpSock[i]; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 0dcec6cc..a33cc6af 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -74,7 +74,7 @@ static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { //设置15秒发送超时时间 - pSock->setSendTimeOutSecond(15); + pSock->setSendTimeOutSecond(45); DebugL << get_peer_ip(); } @@ -628,7 +628,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { if(!strongSelf) { return; } - strongSelf->safeShutdown(); + strongSelf->shutdown(); }); } int iSrvPort = _pBrdcaster->getPort(trackRef->_type); @@ -731,28 +731,22 @@ bool RtspSession::handleReq_Play(const Parser &parser) { if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtpReader = pMediaSrc->getRing()->attach(useBuf); + _pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf); _pRtpReader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->safeShutdown(); + strongSelf->shutdown(); }); _pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->async([weakSelf,pack](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - if(strongSelf->_enableSendRtp) { - strongSelf->sendRtpPacket(pack); - } - }); + if(strongSelf->_enableSendRtp) { + strongSelf->sendRtpPacket(pack); + } }); } }; diff --git a/tests/test_server.cpp b/tests/test_server.cpp index 3f82cd6c..710e8391 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -192,7 +192,7 @@ static onceToken s_token([](){ auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv"; FlvRecorder::Ptr recorder(new FlvRecorder); try{ - recorder->startRecord(dynamic_pointer_cast(sender.shared_from_this()),path); + recorder->startRecord(nullptr,dynamic_pointer_cast(sender.shared_from_this()),path); s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder; }catch(std::exception &ex){ WarnL << ex.what();