diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 712e25cf..48a38835 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -179,7 +179,7 @@ API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; /** - * 开始发送ps-rtp流 + * 开始发送一路ps-rtp流(通过ssrc区分多路) * @param ctx 对象指针 * @param dst_url 目标ip或域名 * @param dst_port 目标端口 @@ -191,11 +191,12 @@ typedef on_mk_media_source_send_rtp_result on_mk_media_send_rtp_result; API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data); /** - * 停止ps-rtp发送 + * 停止某路或全部ps-rtp发送 * @param ctx 对象指针 + * @param ssrc rtp的ssrc,10进制的字符串打印,如果为null或空字符串,则停止所有rtp推流 * @return 1成功,0失败 */ -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx); +API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc); #ifdef __cplusplus } diff --git a/api/include/mk_proxyplayer.h b/api/include/mk_proxyplayer.h index 82844fdf..0dcc0923 100644 --- a/api/include/mk_proxyplayer.h +++ b/api/include/mk_proxyplayer.h @@ -59,7 +59,7 @@ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *u * 如果你不调用mk_proxy_player_release函数,那么MediaSource.close()操作将无效 * @param user_data 用户数据指针,通过mk_proxy_player_set_on_close函数设置 */ -typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data); +typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data, int err, const char *what, int sys_err); /** * 监听MediaSource.close()事件 diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 8c20c87d..d9eb9634 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -193,16 +193,16 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; //sender参数无用 - (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*MediaSource::NullMediaSource, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { cb(user_data, local_port, ex.getErrCode(), ex.what()); } }); } -API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx){ +API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){ assert(ctx); MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; //sender参数无用 - return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1, ""); + return (*obj)->getChannel()->stopSendRtp(*MediaSource::NullMediaSource, ssrc ? ssrc : ""); } \ No newline at end of file diff --git a/api/source/mk_proxyplayer.cpp b/api/source/mk_proxyplayer.cpp index 97f31567..f151ceb8 100644 --- a/api/source/mk_proxyplayer.cpp +++ b/api/source/mk_proxyplayer.cpp @@ -51,9 +51,9 @@ API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx); obj->getPoller()->async([obj,cb,user_data](){ //切换线程再操作 - obj->setOnClose([cb,user_data](){ + obj->setOnClose([cb,user_data](const SockException &ex){ if(cb){ - cb(user_data); + cb(user_data, ex.getErrCode(), ex.what(), ex.getCustomCode()); } }); }); diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 5990a8fe..3a5dde5d 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -672,7 +672,7 @@ void installWebApi() { }); //被主动关闭拉流 - player->setOnClose([key](){ + player->setOnClose([key](const SockException &ex){ lock_guard lck(s_proxyMapMtx); s_proxyMap.erase(key); }); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index b945bc21..c16acba1 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -192,11 +192,12 @@ private: */ class MediaSource: public TrackSource, public enable_shared_from_this { public: - typedef std::shared_ptr Ptr; - typedef unordered_map > StreamMap; - typedef unordered_map AppStreamMap; - typedef unordered_map VhostAppStreamMap; - typedef unordered_map SchemaVhostAppStreamMap; + static constexpr MediaSource *NullMediaSource = nullptr; + using Ptr = std::shared_ptr; + using StreamMap = unordered_map >; + using AppStreamMap = unordered_map; + using VhostAppStreamMap = unordered_map; + using SchemaVhostAppStreamMap = unordered_map; MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ; virtual ~MediaSource() ; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 7d707008..87543e22 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -338,7 +338,7 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ #if defined(ENABLE_RTPPROXY) RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); @@ -360,12 +360,14 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ #endif//ENABLE_RTPPROXY } -bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){ +bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) { #if defined(ENABLE_RTPPROXY) - onceToken token(nullptr, [&]() { - //关闭rtp推流,可能触发无人观看事件 - MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount()); - }); + if (&sender != MediaSource::NullMediaSource) { + onceToken token(nullptr, [&]() { + //关闭rtp推流,可能触发无人观看事件 + MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount()); + }); + } if (ssrc.empty()) { //关闭全部 lock_guard lck(_rtp_sender_mtx); diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index d4fb9959..b43206db 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -106,47 +106,27 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){ return std::make_shared(frame); } -TrackType getTrackType(CodecId codecId){ - switch (codecId){ - case CodecVP8: - case CodecVP9: - case CodecH264: - case CodecH265: return TrackVideo; - case CodecAAC: - case CodecG711A: - case CodecG711U: - case CodecOpus: - case CodecL16: return TrackAudio; - default: return TrackInvalid; +TrackType getTrackType(CodecId codecId) { + switch (codecId) { +#define XX(name, type, value, str) case name : return type; + CODEC_MAP(XX) +#undef XX + default : return TrackInvalid; } } -const char* getCodecName(CodecId codec){ +const char *getCodecName(CodecId codec) { switch (codec) { - case CodecH264 : return "H264"; - case CodecH265 : return "H265"; - case CodecAAC : return "mpeg4-generic"; - case CodecG711A : return "PCMA"; - case CodecG711U : return "PCMU"; - case CodecOpus : return "opus"; - case CodecVP8 : return "VP8"; - case CodecVP9 : return "VP9"; - case CodecL16 : return "L16"; - default: return "invalid"; +#define XX(name, type, value, str) case name : return str; + CODEC_MAP(XX) +#undef XX + default : return "invalid"; } } -static map codec_map = { - {"H264", CodecH264}, - {"H265", CodecH265}, - {"mpeg4-generic", CodecAAC}, - {"PCMA", CodecG711A}, - {"PCMU", CodecG711U}, - {"opus", CodecOpus}, - {"VP8", CodecVP8}, - {"VP9", CodecVP9}, - {"L16", CodecL16} -}; +#define XX(name, type, value, str) {str, name}, +static map codec_map = {CODEC_MAP(XX)}; +#undef XX CodecId getCodecId(const string &str){ auto it = codec_map.find(str); @@ -209,10 +189,22 @@ bool FrameMerger::willFlush(const Frame::Ptr &frame) const{ //时间戳变化了 return true; } - if (frame->getCodecId() == CodecH264 && - H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) { - //如果是264的b/p帧,那么也刷新输出 - return true; + switch (frame->getCodecId()) { + case CodecH264 : { + if (H264_TYPE(frame->data()[frame->prefixSize()]) == H264Frame::NAL_B_P) { + //如果是264的b/p帧,那么也刷新输出 + return true; + } + break; + } + case CodecH265 : { + if (H265_TYPE(frame->data()[frame->prefixSize()]) == H265Frame::NAL_TRAIL_R) { + //如果是265的TRAIL_R帧,那么也刷新输出 + return true; + } + break; + } + default : break; } return _frameCached.size() > kMaxFrameCacheSize; } diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index ce242f71..ca8a2b39 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -21,20 +21,6 @@ using namespace toolkit; namespace mediakit{ -typedef enum { - CodecInvalid = -1, - CodecH264 = 0, - CodecH265, - CodecAAC, - CodecG711A, - CodecG711U, - CodecOpus, - CodecL16, - CodecVP8, - CodecVP9, - CodecMax -} CodecId; - typedef enum { TrackInvalid = -1, TrackVideo = 0, @@ -44,6 +30,26 @@ typedef enum { TrackMax } TrackType; +#define CODEC_MAP(XX) \ + XX(CodecH264, TrackVideo, 0, "H264") \ + XX(CodecH265, TrackVideo, 1, "H265") \ + XX(CodecAAC, TrackAudio, 2, "mpeg4-generic") \ + XX(CodecG711A, TrackAudio, 3, "PCMA") \ + XX(CodecG711U, TrackAudio, 4, "PCMU") \ + XX(CodecOpus, TrackAudio, 5, "opus") \ + XX(CodecL16, TrackAudio, 6, "L16") \ + XX(CodecVP8, TrackVideo, 7, "VP8") \ + XX(CodecVP9, TrackVideo, 8, "VP9") \ + XX(CodecAV1, TrackVideo, 9, "AV1X") + +typedef enum { + CodecInvalid = -1, +#define XX(name, type, value, str) name = value, + CODEC_MAP(XX) +#undef XX + CodecMax +} CodecId; + /** * 字符串转媒体类型转 */ diff --git a/src/Extension/H265.cpp b/src/Extension/H265.cpp index 27f12d1b..76df36a7 100644 --- a/src/Extension/H265.cpp +++ b/src/Extension/H265.cpp @@ -51,7 +51,7 @@ bool getHEVCInfo(const string &strVps, const string &strSps, int &iVideoWidth, i ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// bool H265Frame::keyFrame() const { - return isKeyFrame(H265_TYPE(_buffer[_prefix_size])); + return isKeyFrame(H265_TYPE(_buffer[_prefix_size]), _buffer.data() + _prefix_size); } bool H265Frame::configFrame() const { @@ -63,8 +63,11 @@ bool H265Frame::configFrame() const { } } -bool H265Frame::isKeyFrame(int type) { - return type >= NAL_BLA_W_LP && type <= NAL_RSV_IRAP_VCL23; +bool H265Frame::isKeyFrame(int type, const char *ptr) { + if (!ptr || type != NAL_IDR_W_RADL) { + return type >= NAL_BLA_W_LP && type <= NAL_RSV_IRAP_VCL23; + } + return (((*((uint8_t *) ptr + 2)) >> 7) & 0x01) == 1; } H265Frame::H265Frame(){ @@ -83,7 +86,7 @@ H265FrameNoCacheAble::H265FrameNoCacheAble(char *ptr, size_t size, uint32_t dts, } bool H265FrameNoCacheAble::keyFrame() const { - return H265Frame::isKeyFrame(H265_TYPE(((uint8_t *) _ptr)[_prefix_size])); + return H265Frame::isKeyFrame(H265_TYPE(((uint8_t *) _ptr)[_prefix_size]), _ptr + _prefix_size); } bool H265FrameNoCacheAble::configFrame() const { @@ -152,13 +155,12 @@ void H265Track::inputFrame(const Frame::Ptr &frame) { void H265Track::inputFrame_l(const Frame::Ptr &frame) { int type = H265_TYPE(((uint8_t *) frame->data() + frame->prefixSize())[0]); - if (H265Frame::isKeyFrame(type)) { + if (H265Frame::isKeyFrame(type, frame->data() + frame->prefixSize())) { insertConfigFrame(frame); VideoTrack::inputFrame(frame); _is_idr = true; return; } - _is_idr = false; //非idr帧 diff --git a/src/Extension/H265.h b/src/Extension/H265.h index c9dc671a..fe24df3f 100644 --- a/src/Extension/H265.h +++ b/src/Extension/H265.h @@ -61,7 +61,7 @@ public: bool keyFrame() const override; bool configFrame() const override; - static bool isKeyFrame(int type); + static bool isKeyFrame(int type, const char* ptr); protected: friend class FrameImp; diff --git a/src/Extension/H265Rtmp.cpp b/src/Extension/H265Rtmp.cpp index b3ae4fa9..3a8c32ab 100644 --- a/src/Extension/H265Rtmp.cpp +++ b/src/Extension/H265Rtmp.cpp @@ -169,7 +169,7 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { return; } - if(_lastPacket && _lastPacket->time_stamp != frame->dts()) { + if (_lastPacket && (_lastPacket->time_stamp != frame->dts() || type == H265Frame::NAL_TRAIL_R)) { RtmpCodec::inputRtmp(_lastPacket); _lastPacket = nullptr; } diff --git a/src/Extension/H265Rtp.cpp b/src/Extension/H265Rtp.cpp index 262ba347..c3079313 100644 --- a/src/Extension/H265Rtp.cpp +++ b/src/Extension/H265Rtp.cpp @@ -263,13 +263,13 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { //FU 第1个字节,表明为FU payload[0] = 49 << 1; //FU 第2个字节貌似固定为1 - payload[1] = 1; + payload[1] = ptr[1];// 1; //FU 第3个字节 payload[2] = s_e_flags; //H265 数据 memcpy(payload + 3, ptr + offset, max_size); //输入到rtp环形缓存 - RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type)); + RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type, frame->data() + frame->prefixSize())); } offset += max_size; @@ -281,7 +281,7 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { } void H265RtpEncoder::makeH265Rtp(int nal_type,const void* data, size_t len, bool mark, bool first_packet, uint32_t uiStamp) { - RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type)); + RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type, (const char*)data + prefixSize((const char*)data, len))); } }//namespace mediakit diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 37602dcb..c3cce623 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -111,7 +111,6 @@ protected: * 需要指出的是,在http头中带有Content-Length字段时,该返回值无效 */ virtual ssize_t onResponseHeader(const string &status,const HttpHeader &headers){ - DebugL << status; //无Content-Length字段时默认后面全是content return -1; } diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 40ffa33b..5dbb1b55 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -48,62 +48,66 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, PlayerProxy::PlayerProxy(const string &vhost, const string &app, const string &stream_id, bool enable_hls, bool enable_mp4, int retry_count, const EventPoller::Ptr &poller) - : MediaPlayer(poller) { + : MediaPlayer(poller) { _vhost = vhost; _app = app; _stream_id = stream_id; _enable_hls = enable_hls; _enable_mp4 = enable_mp4; _retry_count = retry_count; + _on_close = [](const SockException &) {}; } -void PlayerProxy::setPlayCallbackOnce(const function &cb){ +void PlayerProxy::setPlayCallbackOnce(const function &cb) { _on_play = cb; } -void PlayerProxy::setOnClose(const function &cb){ - _on_close = cb; +void PlayerProxy::setOnClose(const function &cb) { + _on_close = cb ? cb : [](const SockException &) {}; } void PlayerProxy::play(const string &strUrlTmp) { weak_ptr weakSelf = shared_from_this(); std::shared_ptr piFailedCnt(new int(0)); //连续播放失败次数 - setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { + setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + if (!strongSelf) { return; } - if(strongSelf->_on_play) { + if (strongSelf->_on_play) { strongSelf->_on_play(err); strongSelf->_on_play = nullptr; } - if(!err) { + if (!err) { // 播放成功 *piFailedCnt = 0;//连续播放失败次数清0 strongSelf->onPlaySuccess(); - }else if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { // 播放失败,延时重试播放 - strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); + strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); } }); - setOnShutdown([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { + setOnShutdown([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + if (!strongSelf) { return; } //注销直接拉流代理产生的流:#532 strongSelf->setMediaSource(nullptr); - if(strongSelf->_muxer) { + if (strongSelf->_muxer) { auto tracks = strongSelf->MediaPlayer::getTracks(false); - for (auto & track : tracks){ + for (auto &track : tracks) { track->delDelegate(strongSelf->_muxer.get()); } - GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); + GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay); if (resetWhenRePlay) { strongSelf->_muxer.reset(); } else { @@ -111,8 +115,11 @@ void PlayerProxy::play(const string &strUrlTmp) { } } //播放异常中断,延时重试播放 - if(*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { - strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); + if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { + strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); + } else { + //达到了最大重试次数,回调关闭 + strongSelf->_on_close(err); } }); MediaPlayer::play(strUrlTmp); @@ -120,7 +127,7 @@ void PlayerProxy::play(const string &strUrlTmp) { setDirectProxy(); } -void PlayerProxy::setDirectProxy(){ +void PlayerProxy::setDirectProxy() { MediaSource::Ptr mediaSource; if (dynamic_pointer_cast(_delegate)) { //rtsp拉流 @@ -142,24 +149,24 @@ PlayerProxy::~PlayerProxy() { _timer.reset(); } -void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ - auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60*1000)); +void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) { + auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000, 60 * 1000)); weak_ptr weakSelf = shared_from_this(); - _timer = std::make_shared(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { + _timer = std::make_shared(iDelay / 1000.0f, [weakSelf, strUrl, iFailedCnt]() { //播放失败次数越多,则延时越长 auto strongPlayer = weakSelf.lock(); - if(!strongPlayer) { + if (!strongPlayer) { return false; } - WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; + WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; strongPlayer->MediaPlayer::play(strUrl); strongPlayer->setDirectProxy(); return false; }, getPoller()); } -bool PlayerProxy::close(MediaSource &sender,bool force) { - if(!force && totalReaderCount()){ +bool PlayerProxy::close(MediaSource &sender, bool force) { + if (!force && totalReaderCount()) { return false; } @@ -174,14 +181,12 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { strongSelf->setMediaSource(nullptr); strongSelf->teardown(); }); - if (_on_close) { - _on_close(); - } + _on_close(SockException(Err_shutdown, "closed by user")); WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } -int PlayerProxy::totalReaderCount(){ +int PlayerProxy::totalReaderCount() { return (_muxer ? _muxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); } @@ -189,29 +194,29 @@ int PlayerProxy::totalReaderCount(MediaSource &sender) { return totalReaderCount(); } -MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const{ +MediaOriginType PlayerProxy::getOriginType(MediaSource &sender) const { return MediaOriginType::pull; } -string PlayerProxy::getOriginUrl(MediaSource &sender) const{ +string PlayerProxy::getOriginUrl(MediaSource &sender) const { return _pull_url; } -std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const{ +std::shared_ptr PlayerProxy::getOriginSock(MediaSource &sender) const { return getSockInfo(); } -class MuteAudioMaker : public FrameDispatcher{ +class MuteAudioMaker : public FrameDispatcher { public: typedef std::shared_ptr Ptr; - MuteAudioMaker(){}; + MuteAudioMaker() {}; ~MuteAudioMaker() override {} void inputFrame(const Frame::Ptr &frame) override { - if(frame->getTrackType() == TrackVideo){ + if (frame->getTrackType() == TrackVideo) { auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS; - if(_audio_idx != audio_idx){ + if (_audio_idx != audio_idx) { _audio_idx = audio_idx; auto aacFrame = std::make_shared(CodecAAC, (char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _audio_idx * MUTE_ADTS_DATA_MS, 0 ,ADTS_HEADER_LEN); FrameDispatcher::inputFrame(aacFrame); @@ -220,10 +225,10 @@ public: } private: - class FrameFromStaticPtr : public FrameFromPtr{ + class FrameFromStaticPtr : public FrameFromPtr { public: - template - FrameFromStaticPtr(ARGS && ...args) : FrameFromPtr(std::forward(args)...) {}; + template + FrameFromStaticPtr(ARGS &&...args) : FrameFromPtr(std::forward(args)...) {}; ~FrameFromStaticPtr() override = default; bool cacheAble() const override { @@ -236,7 +241,7 @@ private: }; void PlayerProxy::onPlaySuccess() { - GET_CONFIG(bool,resetWhenRePlay,General::kResetWhenRePlay); + GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay); if (dynamic_pointer_cast(_pMediaSrc)) { //rtsp拉流代理 if (resetWhenRePlay || !_muxer) { diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 3d41d55a..613afe4f 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -34,15 +34,15 @@ public: /** * 设置play结果回调,只触发一次;在play执行之前有效 - * @param cb + * @param cb 回调对象 */ void setPlayCallbackOnce(const function &cb); /** * 设置主动关闭回调 - * @param cb + * @param cb 回调对象 */ - void setOnClose(const function &cb); + void setOnClose(const function &cb); /** * 开始拉流播放 @@ -76,7 +76,7 @@ private: string _stream_id; string _pull_url; Timer::Ptr _timer; - function _on_close; + function _on_close; function _on_play; MultiMediaSourceMuxer::Ptr _muxer; }; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index b061cb29..9a4658a5 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -502,7 +502,6 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { void RtmpSession::onCmd_seek(AMFDecoder &dec) { dec.load();/* NULL */ AMFValue status(AMF_OBJECT); - AMFEncoder invoke; status.set("level", "status"); status.set("code", "NetStream.Seek.Notify"); status.set("description", "Seeking."); diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index cb979bc8..60b04baa 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -104,22 +104,19 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt switch (codecid) { case PSI_STREAM_H264: { InfoL << "got video track: H264"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } case PSI_STREAM_H265: { InfoL << "got video track: H265"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } case PSI_STREAM_AAC: { InfoL<< "got audio track: AAC"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } @@ -128,15 +125,13 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; InfoL << "got audio track: G711"; //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 - auto track = std::make_shared(codec, 8000, 1, 16); - onTrack(track); + onTrack(std::make_shared(codec, 8000, 1, 16)); break; } case PSI_STREAM_AUDIO_OPUS: { InfoL << "got audio track: opus"; - auto track = std::make_shared(); - onTrack(track); + onTrack(std::make_shared()); break; } @@ -159,6 +154,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d switch (codecid) { case PSI_STREAM_H264: { + if (!_tracks[TrackVideo]) { + onTrack(std::make_shared()); + } auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -167,6 +165,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d } case PSI_STREAM_H265: { + if (!_tracks[TrackVideo]) { + onTrack(std::make_shared()); + } auto frame = std::make_shared((char *) data, bytes, (uint32_t)dts, (uint32_t)pts, prefixSize((char *) data, bytes)); _merger.inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer, bool) { onFrame(std::make_shared >(buffer, dts, pts, prefixSize(buffer->data(), buffer->size()), 0)); @@ -180,6 +181,9 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d //这不是aac break; } + if (!_tracks[TrackAudio]) { + onTrack(std::make_shared()); + } onFrame(std::make_shared(CodecAAC, (char *) data, bytes, (uint32_t)dts, 0, ADTS_HEADER_LEN)); break; } @@ -187,11 +191,18 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d case PSI_STREAM_AUDIO_G711A: case PSI_STREAM_AUDIO_G711U: { auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U; + if (!_tracks[TrackAudio]) { + //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 + onTrack(std::make_shared(codec, 8000, 1, 16)); + } onFrame(std::make_shared(codec, (char *) data, bytes, (uint32_t)dts)); break; } case PSI_STREAM_AUDIO_OPUS: { + if (!_tracks[TrackAudio]) { + onTrack(std::make_shared()); + } onFrame(std::make_shared(CodecOpus, (char *) data, bytes, (uint32_t)dts)); break; } @@ -212,6 +223,7 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes, #endif void DecoderImp::onTrack(const Track::Ptr &track) { + _tracks[track->getTrackType()] = track; _sink->addTrack(track); } diff --git a/src/Rtp/Decoder.h b/src/Rtp/Decoder.h index a75a79f8..2f69f7c7 100644 --- a/src/Rtp/Decoder.h +++ b/src/Rtp/Decoder.h @@ -61,6 +61,7 @@ private: MediaSinkInterface *_sink; FrameMerger _merger{FrameMerger::none}; Ticker _last_unsported_print; + Track::Ptr _tracks[TrackMax]; }; }//namespace mediakit diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 7601bfdf..3cc50dbb 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -61,7 +61,7 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 _ssrc_alive[index].resetTime(); } else { //ssrc错误 - if (_ssrc_alive[index].elapsedTime() < 10 * 1000) { + if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { //接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; return false; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c14a7978..1e12adee 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -99,11 +99,16 @@ void RtspSession::onManager() { } } - if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000 && _enable_send_rtp) { - //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 - shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); + if (_push_src && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) { + //推流超时 + shutdown(SockException(Err_timeout, "pusher session timeouted")); return; } + + if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _enable_send_rtp && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) { + //rtp over udp播放器超时 + shutdown(SockException(Err_timeout, "rtp over udp player timeouted")); + } } void RtspSession::onRecv(const Buffer::Ptr &buf) { diff --git a/tests/test_pusherMp4.cpp b/tests/test_pusherMp4.cpp index a33bf0a3..c72f52b1 100644 --- a/tests/test_pusherMp4.cpp +++ b/tests/test_pusherMp4.cpp @@ -24,9 +24,9 @@ using namespace toolkit; using namespace mediakit; //推流器,保持强引用 -MediaPusher::Ptr pusher; +MediaPusher::Ptr g_pusher; Timer::Ptr g_timer; - +MediaSource::Ptr g_src; //声明函数 //推流失败或断开延迟2秒后重试推流 @@ -36,7 +36,7 @@ void rePushDelay(const EventPoller::Ptr &poller, const string &app, const string &stream, const string &filePath, - const string &url) ; + const string &url); //创建推流器并开始推流 void createPusher(const EventPoller::Ptr &poller, @@ -46,36 +46,39 @@ void createPusher(const EventPoller::Ptr &poller, const string &stream, const string &filePath, const string &url) { - //不限制APP名,并且指定文件绝对路径 - auto src = MediaSource::createFromMP4(schema,vhost,app,stream,filePath, false); - if(!src){ + if (!g_src) { + //不限制APP名,并且指定文件绝对路径 + g_src = MediaSource::createFromMP4(schema, vhost, app, stream, filePath, false); + } + if (!g_src) { //文件不存在 WarnL << "MP4文件不存在:" << filePath; return; } //创建推流器并绑定一个MediaSource - pusher.reset(new MediaPusher(src,poller)); + g_pusher.reset(new MediaPusher(g_src, poller)); //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp -// (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; + //(*g_pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 - pusher->setOnShutdown([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { + g_pusher->setOnShutdown([poller, schema, vhost, app, stream, filePath, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重新推流 - rePushDelay(poller,schema,vhost,app, stream,filePath, url); + rePushDelay(poller, schema, vhost, app, stream, filePath, url); }); + //设置发布结果处理逻辑 - pusher->setOnPublished([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { + g_pusher->setOnPublished([poller, schema, vhost, app, stream, filePath, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(poller,schema,vhost,app, stream, filePath ,url); - }else { + rePushDelay(poller, schema, vhost, app, stream, filePath, url); + } else { InfoL << "Publish success,Please play with player:" << url; } }); - pusher->publish(url); + g_pusher->publish(url); } //推流失败或断开延迟2秒后重试推流 @@ -86,39 +89,44 @@ void rePushDelay(const EventPoller::Ptr &poller, const string &stream, const string &filePath, const string &url) { - g_timer = std::make_shared(2.0f,[poller,schema,vhost,app, stream, filePath,url]() { + g_timer = std::make_shared(2.0f, [poller, schema, vhost, app, stream, filePath, url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(poller,schema,vhost,app, stream, filePath,url); + createPusher(poller, schema, vhost, app, stream, filePath, url); //此任务不重复 return false; }, poller); } //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 -int domain(const string & filePath,const string & pushUrl){ +int domain(const string &filePath, const string &pushUrl) { //设置日志 Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); + //循环点播mp4文件 + mINI::Instance()[Record::kFileRepeat] = 1; + mINI::Instance()[General::kHlsDemand] = 1; + mINI::Instance()[General::kTSDemand] = 1; + mINI::Instance()[General::kFMP4Demand] = 1; + //mINI::Instance()[General::kRtspDemand] = 1; + //mINI::Instance()[General::kRtmpDemand] = 1; + auto poller = EventPollerPool::Instance().getPoller(); //vhost/app/stream可以随便自己填,现在不限制app应用名了 - createPusher(poller,FindField(pushUrl.data(), nullptr,"://").substr(0,4),DEFAULT_VHOST,"live","stream",filePath,pushUrl); - + createPusher(poller, FindField(pushUrl.data(), nullptr, "://").substr(0, 4), DEFAULT_VHOST, "live", "stream", filePath, pushUrl); //设置退出信号处理函数 static semaphore sem; signal(SIGINT, [](int) { sem.post(); });// 设置退出信号 sem.wait(); - pusher.reset(); + g_pusher.reset(); g_timer.reset(); return 0; } - - -int main(int argc,char *argv[]){ +int main(int argc, char *argv[]) { //可以使用test_server生成的mp4文件 //文件使用绝对路径,推流url支持rtsp和rtmp - return domain("/Users/xzl/git/ZLMediaKit/release/mac/Debug/www/record/live/rtsp_test1/2020-04-03/15-32-24.mp4","rtsp://127.0.0.1/live/rtsp_push"); + return domain("/home/work/test2.mp4", "rtmp://127.0.0.1/live/rtsp_push"); } diff --git a/webrtc/Sdp.cpp b/webrtc/Sdp.cpp index 5bd993b3..369b3ca8 100644 --- a/webrtc/Sdp.cpp +++ b/webrtc/Sdp.cpp @@ -1421,7 +1421,7 @@ void RtcConfigure::RtcTrackConfigure::setDefaultSetting(TrackType type){ } case TrackVideo: { //此处调整偏好的编码格式优先级 - preferred_codec = {CodecH264, CodecH265}; + preferred_codec = {CodecH264, CodecH265, CodecAV1}; rtcp_fb = {SdpConst::kTWCCRtcpFb, SdpConst::kRembRtcpFb, "nack", "ccm fir", "nack pli"}; extmap = { RtpExtType::abs_send_time, diff --git a/webrtc/offer.sdp b/webrtc/offer.sdp index 762150d8..c3b09302 100644 --- a/webrtc/offer.sdp +++ b/webrtc/offer.sdp @@ -1,5 +1,5 @@ v=0 -o=- 257973874652185302 2 IN IP4 127.0.0.1 +o=- 8056465047193717905 2 IN IP4 127.0.0.1 s=- t=0 0 a=group:BUNDLE 0 1 @@ -8,10 +8,10 @@ a=msid-semantic: WMS m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 110 112 113 126 c=IN IP4 0.0.0.0 a=rtcp:9 IN IP4 0.0.0.0 -a=ice-ufrag:w2IN -a=ice-pwd:X7kCoPoI2NqW8kxuV9LHRR78 +a=ice-ufrag:LtFR +a=ice-pwd:sUVVlvhNoL2g/GL36TyfZGwP a=ice-options:trickle -a=fingerprint:sha-256 7A:A7:A4:9A:BC:37:64:68:9C:48:E5:E9:9B:97:BD:88:17:3E:E5:44:29:4D:6D:BB:AB:2C:85:B8:DE:7A:15:B1 +a=fingerprint:sha-256 21:21:07:E8:3C:D0:3B:45:87:9A:31:86:DE:4F:C1:BA:E1:0E:96:BA:41:36:6E:3A:3F:C6:C8:92:95:5B:71:5F a=setup:actpass a=mid:0 a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level @@ -21,7 +21,7 @@ a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id a=sendrecv -a=msid:- 56049f63-4b19-45c4-aa0a-8895049b5430 +a=msid:- 2ebeb64c-2eb3-4c4f-b5d5-d578245b969e a=rtcp-mux a=rtpmap:111 opus/48000/2 a=rtcp-fb:111 transport-cc @@ -38,17 +38,17 @@ a=rtpmap:110 telephone-event/48000 a=rtpmap:112 telephone-event/32000 a=rtpmap:113 telephone-event/16000 a=rtpmap:126 telephone-event/8000 -a=ssrc:3304267696 cname:sCv+hHL1+2UbfMTB -a=ssrc:3304267696 msid:- 56049f63-4b19-45c4-aa0a-8895049b5430 -a=ssrc:3304267696 mslabel:- -a=ssrc:3304267696 label:56049f63-4b19-45c4-aa0a-8895049b5430 -m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 124 119 123 118 114 115 116 +a=ssrc:905965261 cname:7iEkMV0/MMfqSEce +a=ssrc:905965261 msid:- 2ebeb64c-2eb3-4c4f-b5d5-d578245b969e +a=ssrc:905965261 mslabel:- +a=ssrc:905965261 label:2ebeb64c-2eb3-4c4f-b5d5-d578245b969e +m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 35 36 124 119 123 118 114 115 116 c=IN IP4 0.0.0.0 a=rtcp:9 IN IP4 0.0.0.0 -a=ice-ufrag:w2IN -a=ice-pwd:X7kCoPoI2NqW8kxuV9LHRR78 +a=ice-ufrag:LtFR +a=ice-pwd:sUVVlvhNoL2g/GL36TyfZGwP a=ice-options:trickle -a=fingerprint:sha-256 7A:A7:A4:9A:BC:37:64:68:9C:48:E5:E9:9B:97:BD:88:17:3E:E5:44:29:4D:6D:BB:AB:2C:85:B8:DE:7A:15:B1 +a=fingerprint:sha-256 21:21:07:E8:3C:D0:3B:45:87:9A:31:86:DE:4F:C1:BA:E1:0E:96:BA:41:36:6E:3A:3F:C6:C8:92:95:5B:71:5F a=setup:actpass a=mid:1 a=extmap:14 urn:ietf:params:rtp-hdrext:toffset @@ -63,7 +63,7 @@ a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid a=extmap:5 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id a=extmap:6 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id a=sendrecv -a=msid:- a73b3f5f-007a-4a46-ac8b-582ca7fee460 +a=msid:- f36bb41d-d05d-4310-b05b-7913d0029b18 a=rtcp-mux a=rtcp-rsize a=rtpmap:96 VP8/90000 @@ -128,13 +128,21 @@ a=rtcp-fb:108 nack pli a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f a=rtpmap:109 rtx/90000 a=fmtp:109 apt=108 +a=rtpmap:35 AV1X/90000 +a=rtcp-fb:35 goog-remb +a=rtcp-fb:35 transport-cc +a=rtcp-fb:35 ccm fir +a=rtcp-fb:35 nack +a=rtcp-fb:35 nack pli +a=rtpmap:36 rtx/90000 +a=fmtp:36 apt=35 a=rtpmap:124 H264/90000 a=rtcp-fb:124 goog-remb a=rtcp-fb:124 transport-cc a=rtcp-fb:124 ccm fir a=rtcp-fb:124 nack a=rtcp-fb:124 nack pli -a=fmtp:124 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=4d001f +a=fmtp:124 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=4d0032 a=rtpmap:119 rtx/90000 a=fmtp:119 apt=124 a=rtpmap:123 H264/90000 @@ -143,19 +151,19 @@ a=rtcp-fb:123 transport-cc a=rtcp-fb:123 ccm fir a=rtcp-fb:123 nack a=rtcp-fb:123 nack pli -a=fmtp:123 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=64001f +a=fmtp:123 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032 a=rtpmap:118 rtx/90000 a=fmtp:118 apt=123 a=rtpmap:114 red/90000 a=rtpmap:115 rtx/90000 a=fmtp:115 apt=114 a=rtpmap:116 ulpfec/90000 -a=ssrc-group:FID 1128910219 3552306261 -a=ssrc:1128910219 cname:sCv+hHL1+2UbfMTB -a=ssrc:1128910219 msid:- a73b3f5f-007a-4a46-ac8b-582ca7fee460 -a=ssrc:1128910219 mslabel:- -a=ssrc:1128910219 label:a73b3f5f-007a-4a46-ac8b-582ca7fee460 -a=ssrc:3552306261 cname:sCv+hHL1+2UbfMTB -a=ssrc:3552306261 msid:- a73b3f5f-007a-4a46-ac8b-582ca7fee460 -a=ssrc:3552306261 mslabel:- -a=ssrc:3552306261 label:a73b3f5f-007a-4a46-ac8b-582ca7fee460 \ No newline at end of file +a=ssrc-group:FID 2678501654 361960375 +a=ssrc:2678501654 cname:7iEkMV0/MMfqSEce +a=ssrc:2678501654 msid:- f36bb41d-d05d-4310-b05b-7913d0029b18 +a=ssrc:2678501654 mslabel:- +a=ssrc:2678501654 label:f36bb41d-d05d-4310-b05b-7913d0029b18 +a=ssrc:361960375 cname:7iEkMV0/MMfqSEce +a=ssrc:361960375 msid:- f36bb41d-d05d-4310-b05b-7913d0029b18 +a=ssrc:361960375 mslabel:- +a=ssrc:361960375 label:f36bb41d-d05d-4310-b05b-7913d0029b18 \ No newline at end of file