diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index 30b4cb7c..f407e17d 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -81,8 +81,6 @@ API_EXPORT const char* API_CALL mk_parser_get_content(const mk_parser ctx, int * typedef void* mk_media_info; //MediaInfo::_param_strs API_EXPORT const char* API_CALL mk_media_info_get_params(const mk_media_info ctx); -//MediaInfo["key"] -API_EXPORT const char* API_CALL mk_media_info_get_param(const mk_media_info ctx,const char *key); //MediaInfo::_schema API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx); //MediaInfo::_vhost diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index cc341cc8..3818ad47 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -147,11 +147,7 @@ API_EXPORT const char* API_CALL mk_media_info_get_params(const mk_media_info ctx MediaInfo *info = (MediaInfo *)ctx; return info->_param_strs.c_str(); } -API_EXPORT const char* API_CALL mk_media_info_get_param(const mk_media_info ctx,const char *key){ - assert(ctx && key); - MediaInfo *info = (MediaInfo *)ctx; - return info->_params[key].c_str(); -} + API_EXPORT const char* API_CALL mk_media_info_get_schema(const mk_media_info ctx){ assert(ctx); MediaInfo *info = (MediaInfo *)ctx; diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 0ce721b6..fc8083a3 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -135,30 +135,29 @@ public: //获取HTTP请求中url参数、content参数 static ApiArgsType getAllArgs(const Parser &parser) { ApiArgsType allArgs; - if(parser["Content-Type"].find("application/x-www-form-urlencoded") == 0){ + if (parser["Content-Type"].find("application/x-www-form-urlencoded") == 0) { auto contentArgs = parser.parseArgs(parser.Content()); for (auto &pr : contentArgs) { allArgs[pr.first] = HttpSession::urlDecode(pr.second); } - }else if(parser["Content-Type"].find("application/json") == 0){ + } else if (parser["Content-Type"].find("application/json") == 0) { try { stringstream ss(parser.Content()); Value jsonArgs; ss >> jsonArgs; auto keys = jsonArgs.getMemberNames(); - for (auto key = keys.begin(); key != keys.end(); ++key){ + for (auto key = keys.begin(); key != keys.end(); ++key) { allArgs[*key] = jsonArgs[*key].asString(); } - }catch (std::exception &ex){ + } catch (std::exception &ex) { WarnL << ex.what(); } - }else if(!parser["Content-Type"].empty()){ + } else if (!parser["Content-Type"].empty()) { WarnL << "invalid Content-Type:" << parser["Content-Type"]; } - auto &urlArgs = parser.getUrlArgs(); - for (auto &pr : urlArgs) { - allArgs[pr.first] = HttpSession::urlDecode(pr.second); + for (auto &pr : parser.getUrlArgs()) { + allArgs[pr.first] = pr.second; } return std::move(allArgs); } diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index ff0b6094..7ab8b574 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -362,9 +362,9 @@ void MediaInfo::parse(const string &url){ if(pos != string::npos){ _streamid = steamid.substr(0,pos); _param_strs = steamid.substr(pos + 1); - _params = Parser::parseArgs(_param_strs); - if(_params.find(VHOST_KEY) != _params.end()){ - _vhost = _params[VHOST_KEY]; + auto params = Parser::parseArgs(_param_strs); + if(params.find(VHOST_KEY) != params.end()){ + _vhost = params[VHOST_KEY]; } } else{ _streamid = steamid; diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 2b2e799b..385d6d8c 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -72,19 +72,15 @@ public: virtual int totalReaderCount(MediaSource &sender) = 0; }; +/** + * 解析url获取媒体相关信息 + */ class MediaInfo{ public: MediaInfo(){} - MediaInfo(const string &url){ - parse(url); - } ~MediaInfo(){} - + MediaInfo(const string &url){ parse(url); } void parse(const string &url); - - string &operator[](const string &key){ - return _params[key]; - } public: string _schema; string _host; @@ -92,7 +88,6 @@ public: string _vhost; string _app; string _streamid; - StrCaseMap _params; string _param_strs; }; @@ -118,21 +113,27 @@ public: const string& getApp() const; // 流id const string& getId() const; - // 获取所有Track - vector getTracks(bool trackReady = true) const override; - // 获取监听者 - const std::weak_ptr& getListener() const; // 设置TrackSource void setTrackSource(const std::weak_ptr &track_src); + // 获取所有Track + vector getTracks(bool trackReady = true) const override; + // 设置监听者 virtual void setListener(const std::weak_ptr &listener); + // 获取监听者 + const std::weak_ptr& getListener() const; + + // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数 virtual int readerCount() = 0; // 观看者个数,包括(hls/rtsp/rtmp) virtual int totalReaderCount(); + // 获取流当前时间戳 virtual uint32_t getTimeStamp(TrackType trackType) { return 0; }; + // 设置时间戳 + virtual void setTimeStamp(uint32_t uiStamp) {}; // 拖动进度条 bool seekTo(uint32_t ui32Stamp); @@ -147,12 +148,10 @@ public: static void findAsync(const MediaInfo &info, const std::shared_ptr &session, const function &cb); // 遍历所有流 static void for_each_media(const function &cb); - protected: void regist() ; bool unregist() ; void unregisted(); - private: string _strSchema; string _strVhost; diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index bb977f7b..b711f638 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -31,6 +31,7 @@ #include "Rtmp/RtmpMediaSourceMuxer.h" #include "Record/Recorder.h" #include "Record/HlsMediaSource.h" +#include "Record/HlsRecorder.h" class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this{ public: @@ -42,32 +43,29 @@ public: }; typedef std::shared_ptr Ptr; - MultiMediaSourceMuxer(const string &vhost, - const string &strApp, - const string &strId, - float dur_sec = 0.0, - bool bEanbleRtsp = true, - bool bEanbleRtmp = true, - bool bEanbleHls = true, - bool bEnableMp4 = false){ - if (bEanbleRtmp) { - _rtmp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0, + bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false){ + if (enable_rtmp) { + _rtmp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); } - if (bEanbleRtsp) { - _rtsp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + if (enable_rtsp) { + _rtsp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); } - if(bEanbleHls){ - Recorder::startRecord(Recorder::type_hls,vhost, strApp, strId, true, false); + if(enable_hls){ + Recorder::startRecord(Recorder::type_hls,vhost, app, stream, true, false); } - if(bEnableMp4){ - Recorder::startRecord(Recorder::type_mp4,vhost, strApp, strId, true, false); + if(enable_mp4){ + Recorder::startRecord(Recorder::type_mp4,vhost, app, stream, true, false); } - _get_hls_player = [vhost,strApp,strId](){ - auto src = MediaSource::find(HLS_SCHEMA,vhost,strApp,strId); - return src ? src->readerCount() : 0; + _get_hls_media_source = [vhost,app,stream](){ + auto recorder = dynamic_pointer_cast(Recorder::getRecorder(Recorder::type_hls,vhost,app,stream)); + if(recorder){ + return recorder->getMediaSource(); + } + return MediaSource::Ptr(); }; } virtual ~MultiMediaSourceMuxer(){} @@ -92,9 +90,15 @@ public: if(_rtmp) { _rtmp->setListener(listener); } + if(_rtsp) { _rtsp->setListener(listener); } + + auto hls_src = _get_hls_media_source(); + if(hls_src){ + hls_src->setListener(listener); + } } /** @@ -102,10 +106,15 @@ public: * @return */ int totalReaderCount() const{ - return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) + _get_hls_player(); + auto hls_src = _get_hls_media_source(); + return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) + (hls_src ? hls_src->readerCount() : 0); } void setTimeStamp(uint32_t stamp){ + if(_rtmp){ + _rtmp->setTimeStamp(stamp); + } + if(_rtsp){ _rtsp->setTimeStamp(stamp); } @@ -154,6 +163,11 @@ protected: _rtsp->onAllTrackReady(); } + auto hls_src = _get_hls_media_source(); + if(hls_src){ + hls_src->setTrackSource(shared_from_this()); + } + if(_listener){ _listener->onAllTrackReady(); } @@ -162,7 +176,7 @@ private: RtmpMediaSourceMuxer::Ptr _rtmp; RtspMediaSourceMuxer::Ptr _rtsp; Listener *_listener = nullptr; - function _get_hls_player; + function _get_hls_media_source; }; diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index 2c7825bb..39fe77b9 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -118,8 +118,12 @@ std::shared_ptr HlsMakerImp::makeFile(const string &file,bool setbuf) { return ret; } -void HlsMakerImp::setMediaInfo(const string &vhost, const string &app, const string &stream_id) { +void HlsMakerImp::setMediaSource(const string &vhost, const string &app, const string &stream_id) { _media_src = std::make_shared(vhost, app, stream_id); } +MediaSource::Ptr HlsMakerImp::getMediaSource() const{ + return _media_src; +} + }//namespace mediakit \ No newline at end of file diff --git a/src/Record/HlsMakerImp.h b/src/Record/HlsMakerImp.h index 3a267243..d12aef5d 100644 --- a/src/Record/HlsMakerImp.h +++ b/src/Record/HlsMakerImp.h @@ -51,7 +51,13 @@ public: * @param app 应用名 * @param stream_id 流id */ - void setMediaInfo(const string &vhost, const string &app, const string &stream_id); + void setMediaSource(const string &vhost, const string &app, const string &stream_id); + + /** + * 获取MediaSource + * @return + */ + MediaSource::Ptr getMediaSource() const; protected: string onOpenSegment(int index) override ; void onDelSegment(int index) override; diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index c29d27cd..d9e20e33 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -34,6 +34,7 @@ namespace mediakit { class HlsRecorder : public TsMuxer { public: + typedef std::shared_ptr Ptr; HlsRecorder(const string &m3u8_file, const string ¶ms){ GET_CONFIG(uint32_t,hlsNum,Hls::kSegmentNum); GET_CONFIG(uint32_t,hlsBufSize,Hls::kFileBufSize); @@ -43,8 +44,12 @@ public: ~HlsRecorder(){ delete _hls; } - void setMediaInfo(const string &vhost, const string &app, const string &stream_id){ - _hls->setMediaInfo(vhost,app,stream_id); + void setMediaSource(const string &vhost, const string &app, const string &stream_id){ + _hls->setMediaSource(vhost, app, stream_id); + } + + MediaSource::Ptr getMediaSource() const{ + return _hls->getMediaSource(); } protected: void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) override { diff --git a/src/Record/Recorder.cpp b/src/Record/Recorder.cpp index d2ad95aa..e3ef8106 100644 --- a/src/Record/Recorder.cpp +++ b/src/Record/Recorder.cpp @@ -55,7 +55,7 @@ MediaSinkInterface *createHlsRecorder(const string &strVhost_tmp, const string & } m3u8FilePath = File::absolutePath(m3u8FilePath, hlsPath); auto ret = new HlsRecorder(m3u8FilePath, params); - ret->setMediaInfo(strVhost,strApp,strId); + ret->setMediaSource(strVhost, strApp, strId); return ret; #else return nullptr; @@ -159,6 +159,10 @@ public: const string &getSchema() const{ return _schema; } + + const MediaSinkInterface::Ptr& getRecorder() const{ + return _recorder; + } private: MediaSinkInterface::Ptr _recorder; vector _tracks; @@ -179,6 +183,16 @@ public: return getRecordStatus_l(getRecorderKey(vhost, app, stream_id)); } + MediaSinkInterface::Ptr getRecorder(const string &vhost, const string &app, const string &stream_id) const{ + auto key = getRecorderKey(vhost, app, stream_id); + lock_guard lck(_recorder_mtx); + auto it = _recorder_map.find(key); + if (it == _recorder_map.end()) { + return nullptr; + } + return it->second->getRecorder(); + } + int startRecord(const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) { auto key = getRecorderKey(vhost, app, stream_id); lock_guard lck(_recorder_mtx); @@ -187,9 +201,8 @@ public: return 0; } - string schema; - auto tracks = findTracks(vhost, app, stream_id,schema); - if (!waitForRecord && tracks.empty()) { + auto src = findMediaSource(vhost, app, stream_id); + if (!waitForRecord && !src) { // 暂时无法开启录制 return -1; } @@ -200,9 +213,17 @@ public: return -2; } auto helper = std::make_shared(recorder, continueRecord); - if(tracks.size()){ - helper->attachTracks(std::move(tracks),schema); + if(src){ + auto tracks = src->getTracks(needTrackReady()); + if(tracks.size()){ + helper->attachTracks(std::move(tracks),src->getSchema()); + } + auto hls_recorder = dynamic_pointer_cast(recorder); + if(hls_recorder){ + hls_recorder->getMediaSource()->setListener(src->getListener()); + } } + _recorder_map[key] = std::move(helper); return 0; } @@ -281,25 +302,27 @@ private: } // 查找MediaSource以便录制 - vector findTracks(const string &vhost, const string &app, const string &stream_id,string &schema) { + MediaSource::Ptr findMediaSource(const string &vhost, const string &app, const string &stream_id) { + bool need_ready = needTrackReady(); auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id); if (src) { - auto ret = src->getTracks(needTrackReady()); + auto ret = src->getTracks(need_ready); if (!ret.empty()) { - schema = RTMP_SCHEMA; - return std::move(ret); + return std::move(src); } } src = MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id); if (src) { - schema = RTSP_SCHEMA; - return src->getTracks(needTrackReady()); + auto ret = src->getTracks(need_ready); + if (!ret.empty()) { + return std::move(src); + } } - return vector(); + return nullptr; } - string getRecorderKey(const string &vhost, const string &app, const string &stream_id) { + string getRecorderKey(const string &vhost, const string &app, const string &stream_id) const{ return vhost + "/" + app + "/" + stream_id; } @@ -335,7 +358,7 @@ private: } } private: - recursive_mutex _recorder_mtx; + mutable recursive_mutex _recorder_mtx; NoticeCenter::Ptr _notice_center; unordered_map _recorder_map; }; @@ -351,6 +374,17 @@ Recorder::status Recorder::getRecordStatus(Recorder::type type, const string &vh return status_not_record; } +std::shared_ptr Recorder::getRecorder(type type, const string &vhost, const string &app, const string &stream_id){ + switch (type){ + case type_mp4: + return MediaSourceWatcher::Instance().getRecorder(vhost,app,stream_id); + case type_hls: + return MediaSourceWatcher::Instance().getRecorder(vhost,app,stream_id); + } + return nullptr; +} + + int Recorder::startRecord(Recorder::type type, const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) { switch (type){ case type_mp4: diff --git a/src/Record/Recorder.h b/src/Record/Recorder.h index 6172cabd..26921214 100644 --- a/src/Record/Recorder.h +++ b/src/Record/Recorder.h @@ -88,6 +88,15 @@ public: * 停止所有录制,一般程序退出时调用 */ static void stopAll(); + + /** + * 获取录制对象 + * @param type hls还是MP4录制 + * @param vhost 虚拟主机 + * @param app 应用名 + * @param stream_id 流id + */ + static std::shared_ptr getRecorder(type type, const string &vhost, const string &app, const string &stream_id); private: Recorder() = delete; ~Recorder() = delete; diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index 1ddb082a..2cecd127 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -49,6 +49,10 @@ public: _mediaSouce->setListener(listener); } + void setTimeStamp(uint32_t stamp){ + _mediaSouce->setTimeStamp(stamp); + } + int readerCount() const{ return _mediaSouce->readerCount(); } diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index f4354ae4..2fc0cc99 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -140,7 +140,7 @@ public: /** * 更新时间戳 */ - virtual void setTimeStamp(uint32_t uiStamp) { + void setTimeStamp(uint32_t uiStamp) override { auto tracks = _sdp_parser.getAvailableTrack(); for (auto &track : tracks) { track->_time_stamp = uiStamp;