From b1666eb651c14cf167d1473c451231d5c7da85b3 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 29 Sep 2021 00:04:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=AA=92=E4=BD=93=E6=BA=90pa?= =?UTF-8?q?use/speed=E6=8E=A5=E5=8F=A3:#1129?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 2 ++ src/Record/MP4Reader.cpp | 53 ++++++++++++++++++++++++++++++++----- src/Record/MP4Reader.h | 7 ++++- src/Rtmp/FlvMuxer.cpp | 1 + src/Rtmp/RtmpPusher.cpp | 1 + src/Rtmp/RtmpSession.cpp | 10 ++----- src/Rtmp/RtmpSession.h | 1 - src/Rtsp/RtpMultiCaster.cpp | 1 + src/Rtsp/Rtsp.h | 26 +++++++++++------- src/Rtsp/RtspMuxer.cpp | 28 ++++++++++++-------- src/Rtsp/RtspMuxer.h | 1 + src/Rtsp/RtspPusher.cpp | 1 + src/Rtsp/RtspSession.cpp | 28 +++++++++----------- src/Rtsp/RtspSession.h | 2 -- webrtc/WebRtcTransport.cpp | 1 + 15 files changed, 108 insertions(+), 55 deletions(-) diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 260a023e..cd8d346a 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -269,6 +269,7 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ setSocketFlags(); onWrite(std::make_shared(fmp4_src->getInitSegment()), true); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + fmp4_src->pause(false); _fmp4_reader = fmp4_src->getRing()->attach(getPoller()); _fmp4_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); @@ -309,6 +310,7 @@ bool HttpSession::checkLiveStreamTS(const function &cb){ //直播牺牲延时提升发送性能 setSocketFlags(); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + ts_src->pause(false); _ts_reader = ts_src->getRing()->attach(getPoller()); _ts_reader->setDetachCB([weak_self](){ auto strong_self = weak_self.lock(); diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index 4363733b..09af95ee 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -47,6 +47,12 @@ MP4Reader::MP4Reader(const string &strVhost,const string &strApp, const string & } bool MP4Reader::readSample() { + if (_paused) { + //确保暂停时,时间轴不走动 + _seek_ticker.resetTime(); + return true; + } + bool keyFrame = false; bool eof = false; while (!eof) { @@ -89,20 +95,53 @@ void MP4Reader::startReadMP4() { } uint32_t MP4Reader::getCurrentStamp() { - return (uint32_t)(_seek_to + _seek_ticker.elapsedTime()); + return (uint32_t)(_seek_to + !_paused * _speed * _seek_ticker.elapsedTime()); } -void MP4Reader::setCurrentStamp(uint32_t ui32Stamp){ - _seek_to = ui32Stamp; +void MP4Reader::setCurrentStamp(uint32_t new_stamp){ + auto old_stamp = getCurrentStamp(); + _seek_to = new_stamp; _seek_ticker.resetTime(); - _mediaMuxer->setTimeStamp(ui32Stamp); + if (old_stamp != new_stamp) { + //时间轴未拖动时不操作 + _mediaMuxer->setTimeStamp(new_stamp); + } } -bool MP4Reader::seekTo(MediaSource &sender,uint32_t ui32Stamp){ - return seekTo(ui32Stamp); +bool MP4Reader::seekTo(MediaSource &sender, uint32_t stamp) { + //拖动进度条后应该恢复播放 + pause(sender, false); + TraceL << getOriginUrl(sender) << ",stamp:" << stamp; + return seekTo(stamp); } -bool MP4Reader::seekTo(uint32_t ui32Stamp){ +bool MP4Reader::pause(MediaSource &sender, bool pause) { + if (_paused == pause) { + return true; + } + //_seek_ticker重新计时,不管是暂停还是seek都不影响总的播放进度 + setCurrentStamp(getCurrentStamp()); + _paused = pause; + TraceL << getOriginUrl(sender) << ",pause:" << pause; + return true; +} + +bool MP4Reader::speed(MediaSource &sender, float speed) { + if (speed < 0.1 && speed > 20) { + WarnL << "播放速度取值范围非法:" << speed; + return false; + } + //设置播放速度后应该恢复播放 + pause(sender, false); + if (_speed == speed) { + return true; + } + _speed = speed; + TraceL << getOriginUrl(sender) << ",speed:" << speed; + return true; +} + +bool MP4Reader::seekTo(uint32_t ui32Stamp) { lock_guard lck(_mtx); if (ui32Stamp > _demuxer->getDurationMS()) { //超过文件长度 diff --git a/src/Record/MP4Reader.h b/src/Record/MP4Reader.h index c3791fa6..0e8cc764 100644 --- a/src/Record/MP4Reader.h +++ b/src/Record/MP4Reader.h @@ -38,7 +38,10 @@ public: private: //MediaSourceEvent override - bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; + bool seekTo(MediaSource &sender,uint32_t stamp) override; + bool pause(MediaSource &sender, bool pause) override; + bool speed(MediaSource &sender, float speed) override; + bool close(MediaSource &sender,bool force) override; int totalReaderCount(MediaSource &sender) override; MediaOriginType getOriginType(MediaSource &sender) const override; @@ -51,6 +54,8 @@ private: private: bool _have_video = false; + bool _paused = false; + float _speed = 1.0; uint32_t _seek_to; string _file_path; recursive_mutex _mtx; diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index ab105a28..84f0eb1a 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -36,6 +36,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr onWriteFlvHeader(media); std::weak_ptr weakSelf = getSharedPtr(); + media->pause(false); _ring_reader = media->getRing()->attach(poller); _ring_reader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 9c3567c0..02facae1 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -204,6 +204,7 @@ inline void RtmpPusher::send_metaData(){ sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id); }); + src->pause(false); _rtmp_reader = src->getRing()->attach(getPoller()); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); _rtmp_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) { diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 163a5800..3f9b4355 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -268,6 +268,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr //音频同步于视频 _stamp[0].syncTo(_stamp[1]); + src->pause(false); _ring_reader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { @@ -275,9 +276,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr if (!strongSelf) { return; } - if(strongSelf->_paused){ - return; - } size_t i = 0; auto size = pkt->size(); strongSelf->setSendFlushFlag(false); @@ -295,10 +293,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr } strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); }); + src->pause(false); _player_src = src; - if (src->totalReaderCount() == 1) { - src->seekTo(0); - } //提高服务器发送性能 setSocketFlags(); } @@ -411,8 +407,6 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { sendReply("onStatus", nullptr, status); //streamBegin sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA); - _paused = paused; - auto strongSrc = _player_src.lock(); if (strongSrc) { strongSrc->pause(paused); diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 817d9ec5..e6e4cd0e 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -87,7 +87,6 @@ private: void dumpMetadata(const AMFValue &metadata); private: - bool _paused = false; bool _set_meta_data = false; double _recv_req_id = 0; //消耗的总流量 diff --git a/src/Rtsp/RtpMultiCaster.cpp b/src/Rtsp/RtpMultiCaster.cpp index 3ff6d35e..3c24aa68 100644 --- a/src/Rtsp/RtpMultiCaster.cpp +++ b/src/Rtsp/RtpMultiCaster.cpp @@ -132,6 +132,7 @@ RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, con _udp_sock[i]->bindPeerAddr((struct sockaddr *) &peer); } + src->pause(false); _rtp_reader = src->getRing()->attach(helper.getPoller()); _rtp_reader->setReadCB([this](const RtspMediaSource::RingDataType &pkt) { size_t i = 0; diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index f5d47abc..1e566959 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -334,7 +334,7 @@ private: */ class TitleSdp : public Sdp{ public: - + using Ptr = std::shared_ptr; /** * 构造title类型sdp * @param dur_sec rtsp点播时长,0代表直播,单位秒 @@ -342,12 +342,12 @@ public: * @param version sdp版本 */ TitleSdp(float dur_sec = 0, - const map &header = map(), - int version = 0) : Sdp(0,0){ + const map &header = map(), + int version = 0) : Sdp(0, 0) { _printer << "v=" << version << "\r\n"; - if(!header.empty()){ - for (auto &pr : header){ + if (!header.empty()) { + for (auto &pr : header) { _printer << pr.first << "=" << pr.second << "\r\n"; } } else { @@ -357,23 +357,31 @@ public: _printer << "t=0 0\r\n"; } - if(dur_sec <= 0){ + if (dur_sec <= 0) { //直播 _printer << "a=range:npt=now-\r\n"; - }else{ + } else { //点播 - _printer << "a=range:npt=0-" << dur_sec << "\r\n"; + _dur_sec = dur_sec; + _printer << "a=range:npt=0-" << dur_sec << "\r\n"; } _printer << "a=control:*\r\n"; } + string getSdp() const override { return _printer; } - CodecId getCodecId() const override{ + CodecId getCodecId() const override { return CodecInvalid; } + + float getDuration() const { + return _dur_sec; + } + private: + float _dur_sec = 0; _StrPrinter _printer; }; diff --git a/src/Rtsp/RtspMuxer.cpp b/src/Rtsp/RtspMuxer.cpp index f9c9cdd2..037567e1 100644 --- a/src/Rtsp/RtspMuxer.cpp +++ b/src/Rtsp/RtspMuxer.cpp @@ -14,18 +14,23 @@ namespace mediakit { void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) { - if (_rtp_stamp[in->type] != in->getHeader()->stamp) { - //rtp时间戳变化才计算ntp,节省cpu资源 - int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate; - int64_t stamp_ms_inc; - //求rtp时间戳增量 - _stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc); - _rtp_stamp[in->type] = in->getHeader()->stamp; - _ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start; - } + if (_live) { + if (_rtp_stamp[in->type] != in->getHeader()->stamp) { + //rtp时间戳变化才计算ntp,节省cpu资源 + int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate; + int64_t stamp_ms_inc; + //求rtp时间戳增量 + _stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc); + _rtp_stamp[in->type] = in->getHeader()->stamp; + _ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start; + } - //rtp拦截入口,此处统一赋值ntp - in->ntp_stamp = _ntp_stamp[in->type]; + //rtp拦截入口,此处统一赋值ntp + in->ntp_stamp = _ntp_stamp[in->type]; + } else { + //点播情况下设置ntp时间戳为rtp时间戳 + in->ntp_stamp = in->getStamp() * uint64_t(1000) / in->sample_rate; + } _rtpRing->write(std::move(in), is_key); } @@ -33,6 +38,7 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title) { if (!title) { _sdp = std::make_shared()->getSdp(); } else { + _live = title->getDuration() == 0; _sdp = title->getSdp(); } _rtpRing = std::make_shared(); diff --git a/src/Rtsp/RtspMuxer.h b/src/Rtsp/RtspMuxer.h index 5dd36356..8194f1cd 100644 --- a/src/Rtsp/RtspMuxer.h +++ b/src/Rtsp/RtspMuxer.h @@ -81,6 +81,7 @@ private: void trySyncTrack(); private: + bool _live = true; uint32_t _rtp_stamp[TrackMax]{0}; uint64_t _ntp_stamp[TrackMax]{0}; uint64_t _ntp_stamp_start; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 257526bb..c7fee8e1 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -452,6 +452,7 @@ void RtspPusher::sendRecord() { throw std::runtime_error("the media source was released"); } + src->pause(false); _rtsp_reader = src->getRing()->attach(getPoller()); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); _rtsp_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 597e1964..a6b502f0 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -105,7 +105,7 @@ void RtspSession::onManager() { return; } - if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _enable_send_rtp && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) { + if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) { //rtp over udp播放器超时 shutdown(SockException(Err_timeout, "rtp over udp player timeout")); } @@ -774,12 +774,12 @@ void RtspSession::handleReq_Play(const Parser &parser) { } bool useGOP = true; - float iStartTime = 0; - auto &strRange = parser["Range"]; auto &strScale = parser["Scale"]; - + auto &strRange = parser["Range"]; + StrCaseMap res_header; if (!strScale.empty()) { //这是设置播放速度 + res_header.emplace("Scale", strScale); auto speed = atof(strScale.data()); play_src->speed(speed); InfoP(this) << "rtsp set play speed:" << speed; @@ -787,12 +787,12 @@ void RtspSession::handleReq_Play(const Parser &parser) { if (!strRange.empty()) { //这是seek操作 - _enable_send_rtp = false; + res_header.emplace("Range", strRange); auto strStart = FindField(strRange.data(), "npt=", "-"); if (strStart == "now") { strStart = "0"; } - iStartTime = 1000 * (float) atof(strStart.data()); + auto iStartTime = 1000 * (float) atof(strStart.data()); useGOP = !play_src->seekTo((uint32_t) iStartTime); InfoP(this) << "rtsp seekTo(ms):" << iStartTime; } @@ -814,13 +814,13 @@ void RtspSession::handleReq_Play(const Parser &parser) { } rtp_info.pop_back(); - sendRtspResponse("200 OK", - {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000), - "RTP-Info",rtp_info - }); + + res_header.emplace("RTP-Info", rtp_info); + //已存在Range时不覆盖 + res_header.emplace("Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << play_src->getTimeStamp(TrackInvalid) / 1000.0); + sendRtspResponse("200 OK", res_header); //在回复rtsp信令后再恢复播放 - _enable_send_rtp = true; play_src->pause(false); setSocketFlags(); @@ -840,9 +840,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { if (!strongSelf) { return; } - if (strongSelf->_enable_send_rtp) { - strongSelf->sendRtpPacket(pack); - } + strongSelf->sendRtpPacket(pack); }); } } @@ -854,8 +852,6 @@ void RtspSession::handleReq_Pause(const Parser &parser) { } sendRtspResponse("200 OK"); - _enable_send_rtp = false; - auto play_src = _play_src.lock(); if (play_src) { play_src->pause(true); diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 4157e4bd..d9caf1b1 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -165,8 +165,6 @@ private: private: //是否已经触发on_play事件 bool _emit_on_play = false; - //是否开始发送rtp - bool _enable_send_rtp; //推流或拉流客户端采用的rtp传输方式 Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; //收到的seq,回复时一致 diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 10ee5ad6..7e5e52d2 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -499,6 +499,7 @@ void WebRtcTransportImp::onStartWebRTC() { } } + _play_src->pause(false); _reader = _play_src->getRing()->attach(getPoller(), true); weak_ptr weak_self = static_pointer_cast(shared_from_this()); _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {