2020-09-06 17:56:05 +08:00

639 lines
21 KiB
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
#include <math.h>
#include "MediaSource.h"
#include "Record/MP4Reader.h"
#include "Util/util.h"
#include "Network/sockutil.h"
#include "Network/TcpSession.h"
using namespace toolkit;
namespace mediakit {
recursive_mutex s_media_source_mtx;
MediaSource::SchemaVhostAppStreamMap s_media_source_map;
MediaSource::MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id){
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if (!enableVhost) {
} else {
_vhost = vhost.empty() ? DEFAULT_VHOST : vhost;
_schema = schema;
_app = app;
_stream_id = stream_id;
MediaSource::~MediaSource() {
const string& MediaSource::getSchema() const {
return _schema;
const string& MediaSource::getVhost() const {
return _vhost;
const string& MediaSource::getApp() const {
return _app;
const string& MediaSource::getId() const {
return _stream_id;
vector<Track::Ptr> MediaSource::getTracks(bool ready) const {
auto listener = _listener.lock();
return vector<Track::Ptr>();
return listener->getTracks(const_cast<MediaSource &>(*this), ready);
void MediaSource::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener;
const std::weak_ptr<MediaSourceEvent>& MediaSource::getListener() const{
return _listener;
int MediaSource::totalReaderCount(){
auto listener = _listener.lock();
return readerCount();
return listener->totalReaderCount(*this);
bool MediaSource::seekTo(uint32_t stamp) {
auto listener = _listener.lock();
return false;
return listener->seekTo(*this, stamp);
bool MediaSource::close(bool force) {
auto listener = _listener.lock();
return false;
return listener->close(*this,force);
void MediaSource::onNoneReader(){
auto listener = _listener.lock();
if (listener->totalReaderCount(*this) == 0) {
bool MediaSource::setupRecord(Recorder::type type, bool start, const string &custom_path){
auto listener = _listener.lock();
if (!listener) {
WarnL << "未设置MediaSource的事件监听者setupRecord失败:" << getSchema() << "/" << getVhost() << "/" << getApp() << "/" << getId();
return false;
return listener->setupRecord(*this, type, start, custom_path);
bool MediaSource::isRecording(Recorder::type type){
auto listener = _listener.lock();
return false;
return listener->isRecording(*this, type);
void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
auto listener = _listener.lock();
if (!listener) {
cb(SockException(Err_other, "尚未设置事件监听器"));
return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb);
bool MediaSource::stopSendRtp() {
auto listener = _listener.lock();
if (!listener) {
return false;
return listener->stopSendRtp(*this);
void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) {
decltype(s_media_source_map) copy;
lock_guard<recursive_mutex> lock(s_media_source_mtx);
copy = s_media_source_map;
for (auto &pr0 : copy) {
for (auto &pr1 : pr0.second) {
for (auto &pr2 : pr1.second) {
for (auto &pr3 : pr2.second) {
auto src = pr3.second.lock();
template<typename MAP, typename FUNC>
static bool searchMedia(MAP &map, const string &schema, const string &vhost, const string &app, const string &id, FUNC &&func) {
auto it0 = map.find(schema);
if (it0 == map.end()) {
return false;
auto it1 = it0->second.find(vhost);
if (it1 == it0->second.end()) {
return false;
auto it2 = it1->second.find(app);
if (it2 == it1->second.end()) {
return false;
auto it3 = it2->second.find(id);
if (it3 == it2->second.end()) {
return false;
return func(it0, it1, it2, it3);
template<typename MAP, typename IT0, typename IT1, typename IT2>
static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) {
if (it2->second.empty()) {
if (it1->second.empty()) {
if (it0->second.empty()) {
static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, const string &app, const string &id, bool create_new) {
string vhost = vhost_in;
if(vhost.empty() || !enableVhost){
MediaSource::Ptr ret;
lock_guard<recursive_mutex> lock(s_media_source_mtx);
searchMedia(s_media_source_map, schema, vhost, app, id,
[&](MediaSource::SchemaVhostAppStreamMap::iterator &it0, MediaSource::VhostAppStreamMap::iterator &it1,
MediaSource::AppStreamMap::iterator &it2, MediaSource::StreamMap::iterator &it3) {
ret = it3->second.lock();
if (!ret) {
eraseIfEmpty(s_media_source_map, it0, it1, it2);
return false;
return true;
if(!ret && create_new){
ret = MediaSource::createFromMP4(schema, vhost, app, id);
return ret;
static void findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, bool retry,
const function<void(const MediaSource::Ptr &src)> &cb){
auto src = find_l(info._schema, info._vhost, info._app, info._streamid, true);
if (src || !retry) {
void *listener_tag = session.get();
weak_ptr<TcpSession> weak_session = session;
GET_CONFIG(int, maxWaitMS, General::kMaxStreamWaitTimeMS);
auto on_timeout = session->getPoller()->doDelayTask(maxWaitMS, [cb, listener_tag]() {
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
return 0;
auto cancel_all = [on_timeout, listener_tag]() {
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
function<void()> close_player = [cb, cancel_all]() {
auto on_regist = [weak_session, info, cb, cancel_all](BroadcastMediaChangedArgs) {
auto strong_session = weak_session.lock();
if (!strong_session) {
if (!bRegist ||
sender.getSchema() != info._schema ||
sender.getVhost() != info._vhost ||
sender.getApp() != info._app ||
sender.getId() != info._streamid) {
strong_session->async([weak_session, info, cb]() {
auto strongSession = weak_session.lock();
if (!strongSession) {
DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid;
findAsync_l(info, strongSession, false, cb);
}, false);
NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, on_regist);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream, info, static_cast<SockInfo &>(*session), close_player);
void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session,const function<void(const Ptr &src)> &cb){
return findAsync_l(info, session, true, cb);
MediaSource::Ptr MediaSource::find(const string &schema, const string &vhost, const string &app, const string &id) {
return find_l(schema, vhost, app, id, false);
static string getTrackInfoStr(const TrackSource *track_src){
_StrPrinter codec_info;
auto tracks = track_src->getTracks(true);
for (auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
return codec_info;
void MediaSource::emitEvent(bool regist){
auto listener = _listener.lock();
if (listener) {
listener->onRegist(*this, regist);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, regist, *this);
InfoL << (regist ? "媒体注册:" : "媒体注销:") << _schema << " " << _vhost << " " << _app << " " << _stream_id << " " << getTrackInfoStr(this);
void MediaSource::regist() {
lock_guard<recursive_mutex> lock(s_media_source_mtx);
s_media_source_map[_schema][_vhost][_app][_stream_id] = shared_from_this();
bool MediaSource::unregist() {
bool ret;
lock_guard<recursive_mutex> lock(s_media_source_mtx);
ret = searchMedia(s_media_source_map, _schema, _vhost, _app, _stream_id,
[&](SchemaVhostAppStreamMap::iterator &it0, VhostAppStreamMap::iterator &it1,
AppStreamMap::iterator &it2, StreamMap::iterator &it3) {
auto strong_self = it3->second.lock();
if (strong_self && this != strong_self.get()) {
return false;
eraseIfEmpty(s_media_source_map, it0, it1, it2);
return true;
if (ret) {
return ret;
void MediaInfo::parse(const string &url){
//string url = "rtsp://";
auto schema_pos = url.find("://");
if (schema_pos != string::npos) {
_schema = url.substr(0, schema_pos);
} else {
schema_pos = -3;
auto split_vec = split(url.substr(schema_pos + 3), "/");
if (split_vec.size() > 0) {
auto vhost = split_vec[0];
auto pos = vhost.find(":");
if (pos != string::npos) {
_host = _vhost = vhost.substr(0, pos);
_port = vhost.substr(pos + 1);
} else {
_host = _vhost = vhost;
if (_vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())) {
if (split_vec.size() > 1) {
_app = split_vec[1];
if (split_vec.size() > 2) {
string stream_id;
for (int i = 2; i < split_vec.size(); ++i) {
stream_id.append(split_vec[i] + "/");
if (stream_id.back() == '/') {
auto pos = stream_id.find("?");
if (pos != string::npos) {
_streamid = stream_id.substr(0, pos);
_param_strs = stream_id.substr(pos + 1);
auto params = Parser::parseArgs(_param_strs);
if (params.find(VHOST_KEY) != params.end()) {
_vhost = params[VHOST_KEY];
} else {
_streamid = stream_id;
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if (!enableVhost || _vhost.empty()) {
MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path , bool check_app){
GET_CONFIG(string, appName, Record::kAppName);
if (check_app && app != appName) {
return nullptr;
#ifdef ENABLE_MP4
try {
MP4Reader::Ptr pReader(new MP4Reader(vhost, app, stream, file_path));
return MediaSource::find(schema, vhost, app, stream);
} catch (std::exception &ex) {
WarnL << ex.what();
return nullptr;
WarnL << "创建MP4点播失败请编译时打开\"ENABLE_MP4\"选项";
return nullptr;
#endif //ENABLE_MP4
void MediaSourceEvent::onNoneReader(MediaSource &sender){
GET_CONFIG(string, record_app, Record::kAppName);
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
//如果mp4点播, 无人观看时我们强制关闭点播
bool is_mp4_vod = sender.getApp() == record_app;
weak_ptr<MediaSource> weakSender = sender.shared_from_this();
_async_close_timer = std::make_shared<Timer>(stream_none_reader_delay / 1000.0, [weakSender,is_mp4_vod]() {
auto strongSender = weakSender.lock();
if (!strongSender) {
return false;
if (strongSender->totalReaderCount() != 0) {
return false;
WarnL << "无人观看事件:"
<< strongSender->getSchema() << "/"
<< strongSender->getVhost() << "/"
<< strongSender->getApp() << "/"
<< strongSender->getId();
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strongSender);
WarnL << "MP4点播无人观看,自动关闭:"
<< strongSender->getSchema() << "/"
<< strongSender->getVhost() << "/"
<< strongSender->getApp() << "/"
<< strongSender->getId();
return false;
}, nullptr);
bool MediaSourceEventInterceptor::seekTo(MediaSource &sender, uint32_t stamp) {
auto listener = _listener.lock();
if (!listener) {
return false;
return listener->seekTo(sender, stamp);
bool MediaSourceEventInterceptor::close(MediaSource &sender, bool force) {
auto listener = _listener.lock();
if (!listener) {
return false;
return listener->close(sender, force);
int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) {
auto listener = _listener.lock();
if (!listener) {
return sender.readerCount();
return listener->totalReaderCount(sender);
void MediaSourceEventInterceptor::onNoneReader(MediaSource &sender) {
auto listener = _listener.lock();
if (!listener) {
void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) {
auto listener = _listener.lock();
if (listener) {
listener->onRegist(sender, regist);
bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
auto listener = _listener.lock();
if (!listener) {
return false;
return listener->setupRecord(sender, type, start, custom_path);
bool MediaSourceEventInterceptor::isRecording(MediaSource &sender, Recorder::type type) {
auto listener = _listener.lock();
if (!listener) {
return false;
return listener->isRecording(sender, type);
vector<Track::Ptr> MediaSourceEventInterceptor::getTracks(MediaSource &sender, bool trackReady) const {
auto listener = _listener.lock();
if (!listener) {
return vector<Track::Ptr>();
return listener->getTracks(sender, trackReady);
void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
auto listener = _listener.lock();
if (listener) {
listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
} else {
MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){
auto listener = _listener.lock();
if (listener) {
return listener->stopSendRtp(sender);
return false;
static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
if (new_stamp + 500 < last_stamp) {
return true;
return last_stamp != new_stamp || cache_size >= 1024;
static bool isFlushAble_merge(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) {
if (new_stamp + 500 < last_stamp) {
return true;
if (new_stamp > last_stamp + merge_ms) {
return true;
return cache_size >= 1024;
bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size) {
bool flush_flag = false;
if (is_key && is_video) {
flush_flag = true;
} else {
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if (mergeWriteMS <= 0) {
flush_flag = isFlushAble_default(is_video, _last_stamp[is_video], new_stamp, cache_size);
} else {
flush_flag = isFlushAble_merge(is_video, _last_stamp[is_video], new_stamp, cache_size, mergeWriteMS);
if (flush_flag) {
_last_stamp[is_video] = new_stamp;
return flush_flag;
} /* namespace mediakit */