diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 712e25cf..48a38835 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -179,7 +179,7 @@ API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; /** - * 开始发送ps-rtp流 + * 开始发送一路ps-rtp流(通过ssrc区分多路) * @param ctx 对象指针 * @param dst_url 目标ip或域名 * @param dst_port 目标端口 @@ -191,11 +191,12 @@ typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data); /** - * 停止ps-rtp发送 + * 停止某路或全部ps-rtp发送 * @param ctx 对象指针 + * @param ssrc rtp的ssrc,10进制的字符串打印,如果为null或空字符串,则停止所有rtp推流 * @return 1成功,0失败 */ -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx); +API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc); #ifdef __cplusplus } diff --git a/api/include/mk_proxyplayer.h b/api/include/mk_proxyplayer.h index 82844fdf..0dcc0923 100644 --- a/api/include/mk_proxyplayer.h +++ b/api/include/mk_proxyplayer.h @@ -59,7 +59,7 @@ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *u * 如果你不调用mk_proxy_player_release函数,那么MediaSource.close()操作将无效 * @param user_data 用户数据指针,通过mk_proxy_player_set_on_close函数设置 */ -typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data); +typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data, int err, const char *what, int sys_err); /** * 监听MediaSource.close()事件 diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 8c20c87d..d9eb9634 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -193,16 +193,16 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; //sender参数无用 - (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*MediaSource::NullMediaSource, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { cb(user_data, local_port, ex.getErrCode(), ex.what()); } }); } -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx){ +API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){ assert(ctx); MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; //sender参数无用 - return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1, ""); + return (*obj)->getChannel()->stopSendRtp(*MediaSource::NullMediaSource, ssrc ? ssrc : ""); } \ No newline at end of file diff --git a/api/source/mk_proxyplayer.cpp b/api/source/mk_proxyplayer.cpp index 97f31567..f151ceb8 100644 --- a/api/source/mk_proxyplayer.cpp +++ b/api/source/mk_proxyplayer.cpp @@ -51,9 +51,9 @@ API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx); obj->getPoller()->async([obj,cb,user_data](){ //切换线程再操作 - obj->setOnClose([cb,user_data](){ + obj->setOnClose([cb,user_data](const SockException &ex){ if(cb){ - cb(user_data); + cb(user_data, ex.getErrCode(), ex.what(), ex.getCustomCode()); } }); }); diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 5990a8fe..3a5dde5d 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -672,7 +672,7 @@ void installWebApi() { }); //被主动关闭拉流 - player->setOnClose([key](){ + player->setOnClose([key](const SockException &ex){ lock_guard lck(s_proxyMapMtx); s_proxyMap.erase(key); }); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index b945bc21..c16acba1 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -192,11 +192,12 @@ private: */ class MediaSource: public TrackSource, public enable_shared_from_this { public: - typedef std::shared_ptr Ptr; - typedef unordered_map > StreamMap; - typedef unordered_map AppStreamMap; - typedef unordered_map VhostAppStreamMap; - typedef unordered_map SchemaVhostAppStreamMap; + static constexpr MediaSource *NullMediaSource = nullptr; + using Ptr = std::shared_ptr; + using StreamMap = unordered_map >; + using AppStreamMap = unordered_map; + using VhostAppStreamMap = unordered_map; + using SchemaVhostAppStreamMap = unordered_map; MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ; virtual ~MediaSource() ; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 7d707008..87543e22 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -338,7 +338,7 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ #if defined(ENABLE_RTPPROXY) RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); @@ -360,12 +360,14 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ #endif//ENABLE_RTPPROXY } -bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){ +bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) { #if defined(ENABLE_RTPPROXY) - onceToken token(nullptr, [&]() { - //关闭rtp推流,可能触发无人观看事件 - MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount()); - }); + if (&sender != MediaSource::NullMediaSource) { + onceToken token(nullptr, [&]() { + //关闭rtp推流,可能触发无人观看事件 + MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount()); + }); + } if (ssrc.empty()) { //关闭全部 lock_guard lck(_rtp_sender_mtx); diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index d4fb9959..9d826543 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -209,10 +209,22 @@ bool FrameMerger::willFlush(const Frame::Ptr &frame) const{ //时间戳变化了 return true; } - if (frame->getCodecId() == CodecH264 && - H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) { - //如果是264的b/p帧,那么也刷新输出 - return true; + switch (frame->getCodecId()) { + case CodecH264 : { + if (H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) { + //如果是264的b/p帧,那么也刷新输出 + return true; + } + break; + } + case CodecH265 : { + if (H265_TYPE(frame->data()[frame->prefixSize()]) == H265Frame::NAL_TRAIL_R) { + //如果是265的TRAIL_R帧,那么也刷新输出 + return true; + } + break; + } + default : break; } return _frameCached.size() > kMaxFrameCacheSize; } diff --git a/src/Extension/H265.cpp b/src/Extension/H265.cpp index 27f12d1b..76df36a7 100644 --- a/src/Extension/H265.cpp +++ b/src/Extension/H265.cpp @@ -51,7 +51,7 @@ bool getHEVCInfo(const string &strVps, const string &strSps, int &iVideoWidth, i ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// bool H265Frame::keyFrame() const { - return isKeyFrame(H265_TYPE(_buffer[_prefix_size])); + return isKeyFrame(H265_TYPE(_buffer[_prefix_size]), _buffer.data() + _prefix_size); } bool H265Frame::configFrame() const { @@ -63,8 +63,11 @@ bool H265Frame::configFrame() const { } } -bool H265Frame::isKeyFrame(int type) { - return type >= NAL_BLA_W_LP && type <= NAL_RSV_IRAP_VCL23; +bool H265Frame::isKeyFrame(int type, const char *ptr) { + if (!ptr || type != NAL_IDR_W_RADL) { + return type >= NAL_BLA_W_LP && type <= NAL_RSV_IRAP_VCL23; + } + return (((*((uint8_t *) ptr + 2)) >> 7) & 0x01) == 1; } H265Frame::H265Frame(){ @@ -83,7 +86,7 @@ H265FrameNoCacheAble::H265FrameNoCacheAble(char *ptr, size_t size, uint32_t dts, } bool H265FrameNoCacheAble::keyFrame() const { - return H265Frame::isKeyFrame(H265_TYPE(((uint8_t *) _ptr)[_prefix_size])); + return H265Frame::isKeyFrame(H265_TYPE(((uint8_t *) _ptr)[_prefix_size]), _ptr + _prefix_size); } bool H265FrameNoCacheAble::configFrame() const { @@ -152,13 +155,12 @@ void H265Track::inputFrame(const Frame::Ptr &frame) { void H265Track::inputFrame_l(const Frame::Ptr &frame) { int type = H265_TYPE(((uint8_t *) frame->data() + frame->prefixSize())[0]); - if (H265Frame::isKeyFrame(type)) { + if (H265Frame::isKeyFrame(type, frame->data() + frame->prefixSize())) { insertConfigFrame(frame); VideoTrack::inputFrame(frame); _is_idr = true; return; } - _is_idr = false; //非idr帧 diff --git a/src/Extension/H265.h b/src/Extension/H265.h index c9dc671a..fe24df3f 100644 --- a/src/Extension/H265.h +++ b/src/Extension/H265.h @@ -61,7 +61,7 @@ public: bool keyFrame() const override; bool configFrame() const override; - static bool isKeyFrame(int type); + static bool isKeyFrame(int type, const char* ptr); protected: friend class FrameImp; diff --git a/src/Extension/H265Rtmp.cpp b/src/Extension/H265Rtmp.cpp index b3ae4fa9..3a8c32ab 100644 --- a/src/Extension/H265Rtmp.cpp +++ b/src/Extension/H265Rtmp.cpp @@ -169,7 +169,7 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { return; } - if(_lastPacket && _lastPacket->time_stamp != frame->dts()) { + if (_lastPacket && (_lastPacket->time_stamp != frame->dts() || type == H265Frame::NAL_TRAIL_R)) { RtmpCodec::inputRtmp(_lastPacket); _lastPacket = nullptr; } diff --git a/src/Extension/H265Rtp.cpp b/src/Extension/H265Rtp.cpp index 262ba347..c3079313 100644 --- a/src/Extension/H265Rtp.cpp +++ b/src/Extension/H265Rtp.cpp @@ -263,13 +263,13 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { //FU 第1个字节,表明为FU payload[0] = 49 << 1; //FU 第2个字节貌似固定为1 - payload[1] = 1; + payload[1] = ptr[1];// 1; //FU 第3个字节 payload[2] = s_e_flags; //H265 数据 memcpy(payload + 3, ptr + offset, max_size); //输入到rtp环形缓存 - RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type)); + RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type, frame->data() + frame->prefixSize())); } offset += max_size; @@ -281,7 +281,7 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { } void H265RtpEncoder::makeH265Rtp(int nal_type,const void* data, size_t len, bool mark, bool first_packet, uint32_t uiStamp) { - RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type)); + RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type, (const char*)data + prefixSize((const char*)data, len))); } }//namespace mediakit diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 37602dcb..c3cce623 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -111,7 +111,6 @@ protected: * 需要指出的是,在http头中带有Content-Length字段时,该返回值无效 */ virtual ssize_t onResponseHeader(const string &status,const HttpHeader &headers){ - DebugL << status; //无Content-Length字段时默认后面全是content return -1; } diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 40ffa33b..5dbb1b55 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -48,62 +48,66 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, PlayerProxy::PlayerProxy(const string &vhost, const string &app, const string &stream_id, bool enable_hls, bool enable_mp4, int retry_count, const EventPoller::Ptr &poller) - : MediaPlayer(poller) { + : MediaPlayer(poller) { _vhost = vhost; _app = app; _stream_id = stream_id; _enable_hls = enable_hls; _enable_mp4 = enable_mp4; _retry_count = retry_count; + _on_close = [](const SockException &) {}; } -void PlayerProxy::setPlayCallbackOnce(const function &cb){ +void PlayerProxy::setPlayCallbackOnce(const function &cb) { _on_play = cb; } -void PlayerProxy::setOnClose(const function &cb){ - _on_close = cb; +void PlayerProxy::setOnClose(const function &cb) { + _on_close = cb ? cb : [](const SockException &) {}; } void PlayerProxy::play(const string &strUrlTmp) { weak_ptr weakSelf = shared_from_this(); std::shared_ptr piFailedCnt(new int(0)); //连续播放失败次数 - setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { + setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + if (!strongSelf) { return; } - if(strongSelf->_on_play) { + if (strongSelf->_on_play) { strongSelf->_on_play(err); strongSelf->_on_play = nullptr; } - if(!err) { + if (!err) { // 播放成功 *piFailedCnt = 0;//连续播放失败次数清0 strongSelf->onPlaySuccess(); - }else if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { // 播放失败,延时重试播放 - strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); + strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); } }); - setOnShutdown([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { + setOnShutdown([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + if (!strongSelf) { return; } //注销直接拉流代理产生的流:#532 strongSelf->setMediaSource(nullptr); - if(strongSelf->_muxer) { + if (strongSelf->_muxer) { auto tracks = strongSelf->MediaPlayer::getTracks(false); - for (auto & track : tracks){ + for (auto &track : tracks) { track->delDelegate(strongSelf->_muxer.get()); } - GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); + GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay); if (resetWhenRePlay) { strongSelf->_muxer.reset(); } else { @@ -111,8 +115,11 @@ void PlayerProxy::play(const string &strUrlTmp) { } } //播放异常中断,延时重试播放 - if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { - strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); + if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); } }); MediaPlayer::play(strUrlTmp); @@ -120,7 +127,7 @@ void PlayerProxy::play(const string &strUrlTmp) { setDirectProxy(); } -void PlayerProxy::setDirectProxy(){ +void PlayerProxy::setDirectProxy() { MediaSource::Ptr mediaSource; if (dynamic_pointer_cast(_delegate)) { //rtsp拉流 @@ -142,24 +149,24 @@ PlayerProxy::~PlayerProxy() { _timer.reset(); } -void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ - auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60*1000)); +void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) { + auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { + _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, strUrl, iFailedCnt]() { //播放失败次数越多,则延时越长 auto strongPlayer = weakSelf.lock(); - if(!strongPlayer) { + if (!strongPlayer) { return false; } - WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; + WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; strongPlayer->MediaPlayer::play(strUrl); strongPlayer->setDirectProxy(); return false; }, getPoller()); } -bool PlayerProxy::close(MediaSource &sender,bool force) { - if(!force && totalReaderCount()){ +bool PlayerProxy::close(MediaSource &sender, bool force) { + if (!force && totalReaderCount()) { return false; } @@ -174,14 +181,12 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { strongSelf->setMediaSource(nullptr); strongSelf->teardown(); }); - if (_on_close) { - _on_close(); - } + _on_close(SockException(Err_shutdown, "closed by user")); WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } -int PlayerProxy::totalReaderCount(){ +int PlayerProxy::totalReaderCount() { return (_muxer ? _muxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); } @@ -189,29 +194,29 @@ int PlayerProxy::totalReaderCount(MediaSource &sender) { return totalReaderCount(); } -MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const{ +MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const { return MediaOriginType::pull; } -string PlayerProxy::getOriginUrl(MediaSource &sender) const{ +string PlayerProxy::getOriginUrl(MediaSource &sender) const { return _pull_url; } -std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const{ +std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const { return getSockInfo(); } -class MuteAudioMaker : public FrameDispatcher{ +class MuteAudioMaker : public FrameDispatcher { public: typedef std::shared_ptr Ptr; - MuteAudioMaker(){}; + MuteAudioMaker() {}; ~MuteAudioMaker() override {} void inputFrame(const Frame::Ptr &frame) override { - if(frame->getTrackType() == TrackVideo){ + if (frame->getTrackType() == TrackVideo) { auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS; - if(_audio_idx != audio_idx){ + if (_audio_idx != audio_idx) { _audio_idx = audio_idx; auto aacFrame = std::make_shared(CodecAAC, (char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _audio_idx * MUTE_ADTS_DATA_MS, 0 ,ADTS_HEADER_LEN); FrameDispatcher::inputFrame(aacFrame); @@ -220,10 +225,10 @@ public: } private: - class FrameFromStaticPtr : public FrameFromPtr{ + class FrameFromStaticPtr : public FrameFromPtr { public: - template - FrameFromStaticPtr(ARGS && ...args) : FrameFromPtr(std::forward(args)...) {}; + template + FrameFromStaticPtr(ARGS &&...args) : FrameFromPtr(std::forward(args)...) {}; ~FrameFromStaticPtr() override = default; bool cacheAble() const override { @@ -236,7 +241,7 @@ private: }; void PlayerProxy::onPlaySuccess() { - GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); + GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay); if (dynamic_pointer_cast(_pMediaSrc)) { //rtsp拉流代理 if (resetWhenRePlay || !_muxer) { diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 3d41d55a..613afe4f 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -34,15 +34,15 @@ public: /** * 设置play结果回调,只触发一次;在play执行之前有效 - * @param cb + * @param cb 回调对象 */ void setPlayCallbackOnce(const function &cb); /** * 设置主动关闭回调 - * @param cb + * @param cb 回调对象 */ - void setOnClose(const function &cb); + void setOnClose(const function &cb); /** * 开始拉流播放 @@ -76,7 +76,7 @@ private: string _stream_id; string _pull_url; Timer::Ptr _timer; - function _on_close; + function _on_close; function _on_play; MultiMediaSourceMuxer::Ptr _muxer; }; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index b061cb29..9a4658a5 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -502,7 +502,6 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { void RtmpSession::onCmd_seek(AMFDecoder &dec) { dec.load();/* NULL */ AMFValue status(AMF_OBJECT); - AMFEncoder invoke; status.set("level", "status"); status.set("code", "NetStream.Seek.Notify"); status.set("description", "Seeking."); diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index cb979bc8..60b04baa 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -104,22 +104,19 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt switch (codecid) { case PSI_STREAM_H264: { InfoL << "got video track: H264"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } case PSI_STREAM_H265: { InfoL << "got video track: H265"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } case PSI_STREAM_AAC: { InfoL<< "got audio track: AAC"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } @@ -128,15 +125,13 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; InfoL << "got audio track: G711"; //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 - auto track = std::make_shared(codec, 8000, 1, 16); - onTrack(track); + onTrack(std::make_shared(codec, 8000, 1, 16)); break; } case PSI_STREAM_AUDIO_OPUS: { InfoL << "got audio track: opus"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } @@ -159,6 +154,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d switch (codecid) { case PSI_STREAM_H264: { + if (!_tracks[TrackVideo]) { + onTrack(std::make_shared()); + } auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -167,6 +165,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d } case PSI_STREAM_H265: { + if (!_tracks[TrackVideo]) { + onTrack(std::make_shared()); + } auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -180,6 +181,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d //这不是aac break; } + if (!_tracks[TrackAudio]) { + onTrack(std::make_shared()); + } onFrame(std::make_shared(CodecAAC, (char *) data, bytes, (uint32_t)dts, 0, ADTS_HEADER_LEN)); break; } @@ -187,11 +191,18 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d case PSI_STREAM_AUDIO_G711A: case PSI_STREAM_AUDIO_G711U: { auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; + if (!_tracks[TrackAudio]) { + //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 + onTrack(std::make_shared(codec, 8000, 1, 16)); + } onFrame(std::make_shared(codec, (char *) data, bytes, (uint32_t)dts)); break; } case PSI_STREAM_AUDIO_OPUS: { + if (!_tracks[TrackAudio]) { + onTrack(std::make_shared()); + } onFrame(std::make_shared(CodecOpus, (char *) data, bytes, (uint32_t)dts)); break; } @@ -212,6 +223,7 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes, #endif void DecoderImp::onTrack(const Track::Ptr &track) { + _tracks[track->getTrackType()] = track; _sink->addTrack(track); } diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index a75a79f8..2f69f7c7 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -61,6 +61,7 @@ private: MediaSinkInterface *_sink; FrameMerger _merger{FrameMerger::none}; Ticker _last_unsported_print; + Track::Ptr _tracks[TrackMax]; }; }//namespace mediakit diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 7601bfdf..3cc50dbb 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -61,7 +61,7 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 _ssrc_alive[index].resetTime(); } else { //ssrc错误 - if (_ssrc_alive[index].elapsedTime() < 10 * 1000) { + if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { //接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; return false; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c14a7978..1e12adee 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -99,11 +99,16 @@ void RtspSession::onManager() { } } - if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000 && _enable_send_rtp) { - //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 - shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); + if (_push_src && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) { + //推流超时 + shutdown(SockException(Err_timeout, "pusher session timeouted")); return; } + + if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _enable_send_rtp && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) { + //rtp over udp播放器超时 + shutdown(SockException(Err_timeout, "rtp over udp player timeouted")); + } } void RtspSession::onRecv(const Buffer::Ptr &buf) { diff --git a/tests/test_pusherMp4.cpp b/tests/test_pusherMp4.cpp index a33bf0a3..c72f52b1 100644 --- a/tests/test_pusherMp4.cpp +++ b/tests/test_pusherMp4.cpp @@ -24,9 +24,9 @@ using namespace toolkit; using namespace mediakit; //推流器,保持强引用 -MediaPusher::Ptr pusher; +MediaPusher::Ptr g_pusher; Timer::Ptr g_timer; - +MediaSource::Ptr g_src; //声明函数 //推流失败或断开延迟2秒后重试推流 @@ -36,7 +36,7 @@ void rePushDelay(const EventPoller::Ptr &poller, const string &app, const string &stream, const string &filePath, - const string &url) ; + const string &url); //创建推流器并开始推流 void createPusher(const EventPoller::Ptr &poller, @@ -46,36 +46,39 @@ void createPusher(const EventPoller::Ptr &poller, const string &stream, const string &filePath, const string &url) { - //不限制APP名,并且指定文件绝对路径 - auto src = MediaSource::createFromMP4(schema,vhost,app,stream,filePath, false); - if(!src){ + if (!g_src) { + //不限制APP名,并且指定文件绝对路径 + g_src = MediaSource::createFromMP4(schema, vhost, app, stream, filePath, false); + } + if (!g_src) { //文件不存在 WarnL << "MP4文件不存在:" << filePath; return; } //创建推流器并绑定一个MediaSource - pusher.reset(new MediaPusher(src,poller)); + g_pusher.reset(new MediaPusher(g_src, poller)); //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp -// (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; + //(*g_pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 - pusher->setOnShutdown([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { + g_pusher->setOnShutdown([poller, schema, vhost, app, stream, filePath, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重新推流 - rePushDelay(poller,schema,vhost,app, stream,filePath, url); + rePushDelay(poller, schema, vhost, app, stream, filePath, url); }); + //设置发布结果处理逻辑 - pusher->setOnPublished([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { + g_pusher->setOnPublished([poller, schema, vhost, app, stream, filePath, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(poller,schema,vhost,app, stream, filePath ,url); - }else { + rePushDelay(poller, schema, vhost, app, stream, filePath, url); + } else { InfoL << "Publish success,Please play with player:" << url; } }); - pusher->publish(url); + g_pusher->publish(url); } //推流失败或断开延迟2秒后重试推流 @@ -86,39 +89,44 @@ void rePushDelay(const EventPoller::Ptr &poller, const string &stream, const string &filePath, const string &url) { - g_timer = std::make_shared(2.0f,[poller,schema,vhost,app, stream, filePath,url]() { + g_timer = std::make_shared(2.0f, [poller, schema, vhost, app, stream, filePath, url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(poller,schema,vhost,app, stream, filePath,url); + createPusher(poller, schema, vhost, app, stream, filePath, url); //此任务不重复 return false; }, poller); } //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 -int domain(const string & filePath,const string & pushUrl){ +int domain(const string &filePath, const string &pushUrl) { //设置日志 Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); + //循环点播mp4文件 + mINI::Instance()[Record::kFileRepeat] = 1; + mINI::Instance()[General::kHlsDemand] = 1; + mINI::Instance()[General::kTSDemand] = 1; + mINI::Instance()[General::kFMP4Demand] = 1; + //mINI::Instance()[General::kRtspDemand] = 1; + //mINI::Instance()[General::kRtmpDemand] = 1; + auto poller = EventPollerPool::Instance().getPoller(); //vhost/app/stream可以随便自己填,现在不限制app应用名了 - createPusher(poller,FindField(pushUrl.data(), nullptr,"://").substr(0,4),DEFAULT_VHOST,"live","stream",filePath,pushUrl); - + createPusher(poller, FindField(pushUrl.data(), nullptr, "://").substr(0, 4), DEFAULT_VHOST, "live", "stream", filePath, pushUrl); //设置退出信号处理函数 static semaphore sem; signal(SIGINT, [](int) { sem.post(); });// 设置退出信号 sem.wait(); - pusher.reset(); + g_pusher.reset(); g_timer.reset(); return 0; } - - -int main(int argc,char *argv[]){ +int main(int argc, char *argv[]) { //可以使用test_server生成的mp4文件 //文件使用绝对路径,推流url支持rtsp和rtmp - return domain("/Users/xzl/git/ZLMediaKit/release/mac/Debug/www/record/live/rtsp_test1/2020-04-03/15-32-24.mp4","rtsp://127.0.0.1/live/rtsp_push"); + return domain("/home/work/test2.mp4", "rtmp://127.0.0.1/live/rtsp_push"); }