!1 多路RTP 流输出

Merge pull request !1 from kingyuanyuan/branch
This commit is contained in:
kingyuanyuan 2020-11-27 17:26:52 +08:00 committed by Gitee
commit 62130f77e6
9 changed files with 3449 additions and 3431 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,170 +1,170 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#include "MediaSink.h" #include "MediaSink.h"
//最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track //最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track
#define MAX_WAIT_MS_READY 10000 #define MAX_WAIT_MS_READY 10000
//如果添加Track最多等待5秒 //如果添加Track最多等待5秒
#define MAX_WAIT_MS_ADD_TRACK 5000 #define MAX_WAIT_MS_ADD_TRACK 1000
namespace mediakit{ namespace mediakit{
void MediaSink::addTrack(const Track::Ptr &track_in) { void MediaSink::addTrack(const Track::Ptr &track_in) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
if (_all_track_ready) { if (_all_track_ready) {
WarnL << "all track is ready, add this track too late!"; WarnL << "all track is ready, add this track too late!";
return; return;
} }
//克隆Track只拷贝其数据不拷贝其数据转发关系 //克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone(); auto track = track_in->clone();
auto codec_id = track->getCodecId(); auto codec_id = track->getCodecId();
_track_map[codec_id] = track; _track_map[codec_id] = track;
_track_ready_callback[codec_id] = [this, track]() { _track_ready_callback[codec_id] = [this, track]() {
onTrackReady(track); onTrackReady(track);
}; };
_ticker.resetTime(); _ticker.resetTime();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) { track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if (_all_track_ready) { if (_all_track_ready) {
onTrackFrame(frame); onTrackFrame(frame);
} else { } else {
//还有Track未就绪先缓存之 //还有Track未就绪先缓存之
_frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame)); _frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame));
} }
})); }));
} }
void MediaSink::resetTracks() { void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
_all_track_ready = false; _all_track_ready = false;
_track_map.clear(); _track_map.clear();
_track_ready_callback.clear(); _track_ready_callback.clear();
_ticker.resetTime(); _ticker.resetTime();
_max_track_size = 2; _max_track_size = 2;
_frame_unread.clear(); _frame_unread.clear();
} }
void MediaSink::inputFrame(const Frame::Ptr &frame) { void MediaSink::inputFrame(const Frame::Ptr &frame) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
auto it = _track_map.find(frame->getCodecId()); auto it = _track_map.find(frame->getCodecId());
if (it == _track_map.end()) { if (it == _track_map.end()) {
return; return;
} }
it->second->inputFrame(frame); it->second->inputFrame(frame);
checkTrackIfReady(nullptr); checkTrackIfReady(nullptr);
} }
void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){ void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
//Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调 //Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
auto it_callback = _track_ready_callback.find(track->getCodecId()); auto it_callback = _track_ready_callback.find(track->getCodecId());
if (it_callback != _track_ready_callback.end() && track->ready()) { if (it_callback != _track_ready_callback.end() && track->ready()) {
it_callback->second(); it_callback->second();
_track_ready_callback.erase(it_callback); _track_ready_callback.erase(it_callback);
} }
} }
void MediaSink::checkTrackIfReady(const Track::Ptr &track){ void MediaSink::checkTrackIfReady(const Track::Ptr &track){
if (!_all_track_ready && !_track_ready_callback.empty()) { if (!_all_track_ready && !_track_ready_callback.empty()) {
if (track) { if (track) {
checkTrackIfReady_l(track); checkTrackIfReady_l(track);
} else { } else {
for (auto &pr : _track_map) { for (auto &pr : _track_map) {
checkTrackIfReady_l(pr.second); checkTrackIfReady_l(pr.second);
} }
} }
} }
if(!_all_track_ready){ if(!_all_track_ready){
if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){ if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
//如果超过规定时间那么不再等待并忽略未准备好的Track //如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady(); emitAllTrackReady();
return; return;
} }
if(!_track_ready_callback.empty()){ if(!_track_ready_callback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待 //在超时时间内如果存在未准备好的Track那么继续等待
return; return;
} }
if(_track_map.size() == _max_track_size){ if(_track_map.size() == _max_track_size){
//如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了 //如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
emitAllTrackReady(); emitAllTrackReady();
return; return;
} }
if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){ if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
//如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track) //如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady(); emitAllTrackReady();
return; return;
} }
} }
} }
void MediaSink::addTrackCompleted(){ void MediaSink::addTrackCompleted(){
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
_max_track_size = _track_map.size(); _max_track_size = _track_map.size();
checkTrackIfReady(nullptr); checkTrackIfReady(nullptr);
} }
void MediaSink::emitAllTrackReady() { void MediaSink::emitAllTrackReady() {
if (_all_track_ready) { if (_all_track_ready) {
return; return;
} }
DebugL << "all track ready use " << _ticker.elapsedTime() << "ms"; DebugL << "all track ready use " << _ticker.elapsedTime() << "ms";
if (!_track_ready_callback.empty()) { if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track //这是超时强制忽略未准备好的Track
_track_ready_callback.clear(); _track_ready_callback.clear();
//移除未准备好的Track //移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) { for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) { if (!it->second->ready()) {
WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName(); WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName();
it = _track_map.erase(it); it = _track_map.erase(it);
continue; continue;
} }
++it; ++it;
} }
} }
if (!_track_map.empty()) { if (!_track_map.empty()) {
//最少有一个有效的Track //最少有一个有效的Track
_all_track_ready = true; _all_track_ready = true;
onAllTrackReady(); onAllTrackReady();
//全部Track就绪我们一次性把之前的帧输出 //全部Track就绪我们一次性把之前的帧输出
for(auto &pr : _frame_unread){ for(auto &pr : _frame_unread){
if (_track_map.find(pr.first) == _track_map.end()) { if (_track_map.find(pr.first) == _track_map.end()) {
//该Track已经被移除 //该Track已经被移除
continue; continue;
} }
pr.second.for_each([&](const Frame::Ptr &frame) { pr.second.for_each([&](const Frame::Ptr &frame) {
onTrackFrame(frame); onTrackFrame(frame);
}); });
} }
_frame_unread.clear(); _frame_unread.clear();
} }
} }
vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{ vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{
vector<Track::Ptr> ret; vector<Track::Ptr> ret;
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
for (auto &pr : _track_map){ for (auto &pr : _track_map){
if(trackReady && !pr.second->ready()){ if(trackReady && !pr.second->ready()){
continue; continue;
} }
ret.emplace_back(pr.second); ret.emplace_back(pr.second);
} }
return ret; return ret;
} }
}//namespace mediakit }//namespace mediakit

File diff suppressed because it is too large Load Diff

View File

@ -1,361 +1,361 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#ifndef ZLMEDIAKIT_MEDIASOURCE_H #ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H #define ZLMEDIAKIT_MEDIASOURCE_H
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <memory> #include <memory>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
#include "Common/config.h" #include "Common/config.h"
#include "Common/Parser.h" #include "Common/Parser.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h" #include "Util/NoticeCenter.h"
#include "Util/List.h" #include "Util/List.h"
#include "Network/Socket.h" #include "Network/Socket.h"
#include "Rtsp/Rtsp.h" #include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h" #include "Rtmp/Rtmp.h"
#include "Extension/Track.h" #include "Extension/Track.h"
#include "Record/Recorder.h" #include "Record/Recorder.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
namespace toolkit{ namespace toolkit{
class TcpSession; class TcpSession;
}// namespace toolkit }// namespace toolkit
namespace mediakit { namespace mediakit {
enum class MediaOriginType : uint8_t { enum class MediaOriginType : uint8_t {
unknown = 0, unknown = 0,
rtmp_push , rtmp_push ,
rtsp_push, rtsp_push,
rtp_push, rtp_push,
pull, pull,
ffmpeg_pull, ffmpeg_pull,
mp4_vod, mp4_vod,
device_chn device_chn
}; };
string getOriginTypeString(MediaOriginType type); string getOriginTypeString(MediaOriginType type);
class MediaSource; class MediaSource;
class MediaSourceEvent{ class MediaSourceEvent{
public: public:
friend class MediaSource; friend class MediaSource;
MediaSourceEvent(){}; MediaSourceEvent(){};
virtual ~MediaSourceEvent(){}; virtual ~MediaSourceEvent(){};
// 获取媒体源类型 // 获取媒体源类型
virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; } virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; }
// 获取媒体源url或者文件路径 // 获取媒体源url或者文件路径
virtual string getOriginUrl(MediaSource &sender) const { return ""; } virtual string getOriginUrl(MediaSource &sender) const { return ""; }
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
virtual std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const { return nullptr; } virtual std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const { return nullptr; }
// 通知拖动进度条 // 通知拖动进度条
virtual bool seekTo(MediaSource &sender, uint32_t stamp) { return false; } virtual bool seekTo(MediaSource &sender, uint32_t stamp) { return false; }
// 通知其停止产生流 // 通知其停止产生流
virtual bool close(MediaSource &sender, bool force) { return false; } virtual bool close(MediaSource &sender, bool force) { return false; }
// 获取观看总人数 // 获取观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0; virtual int totalReaderCount(MediaSource &sender) = 0;
// 通知观看人数变化 // 通知观看人数变化
virtual void onReaderChanged(MediaSource &sender, int size); virtual void onReaderChanged(MediaSource &sender, int size);
//流注册或注销事件 //流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {}; virtual void onRegist(MediaSource &sender, bool regist) {};
////////////////////////仅供MultiMediaSourceMuxer对象继承//////////////////////// ////////////////////////仅供MultiMediaSourceMuxer对象继承////////////////////////
// 开启或关闭录制 // 开启或关闭录制
virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; }; virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
// 获取录制状态 // 获取录制状态
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
// 获取所有track相关信息 // 获取所有track相关信息
virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); }; virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); };
// 开始发送ps-rtp // 开始发送ps-rtp
virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));}; virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));};
// 停止发送ps-rtp // 停止发送ps-rtp
virtual bool stopSendRtp(MediaSource &sender) {return false; } virtual bool stopSendRtp(MediaSource &sender, const string &ssrc) {return false; }
private: private:
Timer::Ptr _async_close_timer; Timer::Ptr _async_close_timer;
}; };
//该对象用于拦截感兴趣的MediaSourceEvent事件 //该对象用于拦截感兴趣的MediaSourceEvent事件
class MediaSourceEventInterceptor : public MediaSourceEvent{ class MediaSourceEventInterceptor : public MediaSourceEvent{
public: public:
MediaSourceEventInterceptor(){} MediaSourceEventInterceptor(){}
~MediaSourceEventInterceptor() override {} ~MediaSourceEventInterceptor() override {}
void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener); void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener);
std::shared_ptr<MediaSourceEvent> getDelegate() const; std::shared_ptr<MediaSourceEvent> getDelegate() const;
MediaOriginType getOriginType(MediaSource &sender) const override; MediaOriginType getOriginType(MediaSource &sender) const override;
string getOriginUrl(MediaSource &sender) const override; string getOriginUrl(MediaSource &sender) const override;
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
bool seekTo(MediaSource &sender, uint32_t stamp) override; bool seekTo(MediaSource &sender, uint32_t stamp) override;
bool close(MediaSource &sender, bool force) override; bool close(MediaSource &sender, bool force) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void onReaderChanged(MediaSource &sender, int size) override; void onReaderChanged(MediaSource &sender, int size) override;
void onRegist(MediaSource &sender, bool regist) override; void onRegist(MediaSource &sender, bool regist) override;
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
bool isRecording(MediaSource &sender, Recorder::type type) override; bool isRecording(MediaSource &sender, Recorder::type type) override;
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override; vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override; void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) override;
bool stopSendRtp(MediaSource &sender) override; bool stopSendRtp(MediaSource &sender, const string &ssrc) override;
private: private:
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
}; };
/** /**
* url获取媒体相关信息 * url获取媒体相关信息
*/ */
class MediaInfo{ class MediaInfo{
public: public:
~MediaInfo() {} ~MediaInfo() {}
MediaInfo() {} MediaInfo() {}
MediaInfo(const string &url) { parse(url); } MediaInfo(const string &url) { parse(url); }
void parse(const string &url); void parse(const string &url);
public: public:
string _full_url; string _full_url;
string _schema; string _schema;
string _host; string _host;
string _port; string _port;
string _vhost; string _vhost;
string _app; string _app;
string _streamid; string _streamid;
string _param_strs; string _param_strs;
}; };
class BytesSpeed { class BytesSpeed {
public: public:
BytesSpeed() = default; BytesSpeed() = default;
~BytesSpeed() = default; ~BytesSpeed() = default;
/** /**
* *
*/ */
BytesSpeed& operator += (uint64_t bytes) { BytesSpeed& operator += (uint64_t bytes) {
_bytes += bytes; _bytes += bytes;
if (_bytes > 1024 * 1024) { if (_bytes > 1024 * 1024) {
//数据大于1MB就计算一次网速 //数据大于1MB就计算一次网速
computeSpeed(); computeSpeed();
} }
return *this; return *this;
} }
/** /**
* bytes/s * bytes/s
*/ */
int getSpeed() { int getSpeed() {
if (_ticker.elapsedTime() < 1000) { if (_ticker.elapsedTime() < 1000) {
//获取频率小于1秒那么返回上次计算结果 //获取频率小于1秒那么返回上次计算结果
return _speed; return _speed;
} }
return computeSpeed(); return computeSpeed();
} }
private: private:
uint64_t computeSpeed() { uint64_t computeSpeed() {
auto elapsed = _ticker.elapsedTime(); auto elapsed = _ticker.elapsedTime();
if (!elapsed) { if (!elapsed) {
return _speed; return _speed;
} }
_speed = _bytes * 1000 / elapsed; _speed = _bytes * 1000 / elapsed;
_ticker.resetTime(); _ticker.resetTime();
_bytes = 0; _bytes = 0;
return _speed; return _speed;
} }
private: private:
int _speed = 0; int _speed = 0;
uint64_t _bytes = 0; uint64_t _bytes = 0;
Ticker _ticker; Ticker _ticker;
}; };
/** /**
* rtsp/rtmp的直播流都源自该对象 * rtsp/rtmp的直播流都源自该对象
*/ */
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> { class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
public: public:
typedef std::shared_ptr<MediaSource> Ptr; typedef std::shared_ptr<MediaSource> Ptr;
typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap; typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap;
typedef unordered_map<string, StreamMap > AppStreamMap; typedef unordered_map<string, StreamMap > AppStreamMap;
typedef unordered_map<string, AppStreamMap > VhostAppStreamMap; typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap; typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;
MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ; MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ;
virtual ~MediaSource() ; virtual ~MediaSource() ;
////////////////获取MediaSource相关信息//////////////// ////////////////获取MediaSource相关信息////////////////
// 获取协议类型 // 获取协议类型
const string& getSchema() const; const string& getSchema() const;
// 虚拟主机 // 虚拟主机
const string& getVhost() const; const string& getVhost() const;
// 应用名 // 应用名
const string& getApp() const; const string& getApp() const;
// 流id // 流id
const string& getId() const; const string& getId() const;
// 获取所有Track // 获取所有Track
vector<Track::Ptr> getTracks(bool ready = true) const override; vector<Track::Ptr> getTracks(bool ready = true) const override;
// 获取流当前时间戳 // 获取流当前时间戳
virtual uint32_t getTimeStamp(TrackType type) { return 0; }; virtual uint32_t getTimeStamp(TrackType type) { return 0; };
// 设置时间戳 // 设置时间戳
virtual void setTimeStamp(uint32_t stamp) {}; virtual void setTimeStamp(uint32_t stamp) {};
// 获取数据速率单位bytes/s // 获取数据速率单位bytes/s
int getBytesSpeed(); int getBytesSpeed();
// 获取流创建GMT unix时间戳单位秒 // 获取流创建GMT unix时间戳单位秒
uint64_t getCreateStamp() const; uint64_t getCreateStamp() const;
// 获取流上线时间,单位秒 // 获取流上线时间,单位秒
uint64_t getAliveSecond() const; uint64_t getAliveSecond() const;
////////////////MediaSourceEvent相关接口实现//////////////// ////////////////MediaSourceEvent相关接口实现////////////////
// 设置监听者 // 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener); virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取监听者 // 获取监听者
std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const; std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const;
// 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数 // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
virtual int readerCount() = 0; virtual int readerCount() = 0;
// 观看者个数,包括(hls/rtsp/rtmp) // 观看者个数,包括(hls/rtsp/rtmp)
virtual int totalReaderCount(); virtual int totalReaderCount();
// 获取媒体源类型 // 获取媒体源类型
MediaOriginType getOriginType() const; MediaOriginType getOriginType() const;
// 获取媒体源url或者文件路径 // 获取媒体源url或者文件路径
string getOriginUrl() const; string getOriginUrl() const;
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock() const; std::shared_ptr<SockInfo> getOriginSock() const;
// 拖动进度条 // 拖动进度条
bool seekTo(uint32_t stamp); bool seekTo(uint32_t stamp);
// 关闭该流 // 关闭该流
bool close(bool force); bool close(bool force);
// 该流观看人数变化 // 该流观看人数变化
void onReaderChanged(int size); void onReaderChanged(int size);
// 开启或关闭录制 // 开启或关闭录制
bool setupRecord(Recorder::type type, bool start, const string &custom_path); bool setupRecord(Recorder::type type, bool start, const string &custom_path);
// 获取录制状态 // 获取录制状态
bool isRecording(Recorder::type type); bool isRecording(Recorder::type type);
// 开始发送ps-rtp // 开始发送ps-rtp
void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb); void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb);
// 停止发送ps-rtp // 停止发送ps-rtp
bool stopSendRtp(); bool stopSendRtp(const string &ssrc);
////////////////static方法查找或生成MediaSource//////////////// ////////////////static方法查找或生成MediaSource////////////////
// 同步查找流 // 同步查找流
static Ptr find(const string &schema, const string &vhost, const string &app, const string &id); static Ptr find(const string &schema, const string &vhost, const string &app, const string &id);
// 忽略类型同步查找流可能返回rtmp/rtsp/hls类型 // 忽略类型同步查找流可能返回rtmp/rtsp/hls类型
static Ptr find(const string &vhost, const string &app, const string &stream_id); static Ptr find(const string &vhost, const string &app, const string &stream_id);
// 异步查找流 // 异步查找流
static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb); static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
// 遍历所有流 // 遍历所有流
static void for_each_media(const function<void(const Ptr &src)> &cb); static void for_each_media(const function<void(const Ptr &src)> &cb);
// 从mp4文件生成MediaSource // 从mp4文件生成MediaSource
static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path = "", bool check_app = true); static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path = "", bool check_app = true);
protected: protected:
//媒体注册 //媒体注册
void regist(); void regist();
private: private:
//媒体注销 //媒体注销
bool unregist(); bool unregist();
//触发媒体事件 //触发媒体事件
void emitEvent(bool regist); void emitEvent(bool regist);
protected: protected:
BytesSpeed _speed; BytesSpeed _speed;
private: private:
time_t _create_stamp; time_t _create_stamp;
Ticker _ticker; Ticker _ticker;
string _schema; string _schema;
string _vhost; string _vhost;
string _app; string _app;
string _stream_id; string _stream_id;
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
}; };
///缓存刷新策略类 ///缓存刷新策略类
class FlushPolicy { class FlushPolicy {
public: public:
FlushPolicy() = default; FlushPolicy() = default;
~FlushPolicy() = default; ~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size); bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size);
private: private:
uint64_t _last_stamp[2] = {0, 0}; uint64_t _last_stamp[2] = {0, 0};
}; };
/// 合并写缓存模板 /// 合并写缓存模板
/// \tparam packet 包类型 /// \tparam packet 包类型
/// \tparam policy 刷新缓存策略 /// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型 /// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > > template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class PacketCache { class PacketCache {
public: public:
PacketCache(){ PacketCache(){
_cache = std::make_shared<packet_list>(); _cache = std::make_shared<packet_list>();
} }
virtual ~PacketCache() = default; virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) { void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) { if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flushAll(); flushAll();
} }
//追加数据到最后 //追加数据到最后
_cache->emplace_back(std::move(pkt)); _cache->emplace_back(std::move(pkt));
if (key_pos) { if (key_pos) {
_key_pos = key_pos; _key_pos = key_pos;
} }
} }
virtual void clearCache() { virtual void clearCache() {
_cache->clear(); _cache->clear();
} }
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0; virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private: private:
void flushAll() { void flushAll() {
if (_cache->empty()) { if (_cache->empty()) {
return; return;
} }
onFlush(std::move(_cache), _key_pos); onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>(); _cache = std::make_shared<packet_list>();
_key_pos = false; _key_pos = false;
} }
private: private:
bool _key_pos = false; bool _key_pos = false;
policy _policy; policy _policy;
std::shared_ptr<packet_list> _cache; std::shared_ptr<packet_list> _cache;
}; };
} /* namespace mediakit */ } /* namespace mediakit */
#endif //ZLMEDIAKIT_MEDIASOURCE_H #endif //ZLMEDIAKIT_MEDIASOURCE_H

View File

@ -1,472 +1,480 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#include <math.h> #include <math.h>
#include "Common/config.h" #include "Common/config.h"
#include "MultiMediaSourceMuxer.h" #include "MultiMediaSourceMuxer.h"
namespace mediakit { namespace mediakit {
///////////////////////////////MultiMuxerPrivate////////////////////////////////// ///////////////////////////////MultiMuxerPrivate//////////////////////////////////
MultiMuxerPrivate::~MultiMuxerPrivate() {} MultiMuxerPrivate::~MultiMuxerPrivate() {}
MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec, MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_stream_url = vhost + " " + app + " " + stream; _stream_url = vhost + " " + app + " " + stream;
if (enable_rtmp) { if (enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec)); _rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec));
} }
if (enable_rtsp) { if (enable_rtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleSdp>(dur_sec)); _rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleSdp>(dur_sec));
} }
if (enable_hls) { if (enable_hls) {
_hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream)); _hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream));
} }
if (enable_mp4) { if (enable_mp4) {
_mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream);
} }
_ts = std::make_shared<TSMediaSourceMuxer>(vhost, app, stream); _ts = std::make_shared<TSMediaSourceMuxer>(vhost, app, stream);
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
_fmp4 = std::make_shared<FMP4MediaSourceMuxer>(vhost, app, stream); _fmp4 = std::make_shared<FMP4MediaSourceMuxer>(vhost, app, stream);
#endif #endif
} }
void MultiMuxerPrivate::resetTracks() { void MultiMuxerPrivate::resetTracks() {
if (_rtmp) { if (_rtmp) {
_rtmp->resetTracks(); _rtmp->resetTracks();
} }
if (_rtsp) { if (_rtsp) {
_rtsp->resetTracks(); _rtsp->resetTracks();
} }
if (_ts) { if (_ts) {
_ts->resetTracks(); _ts->resetTracks();
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->resetTracks(); _fmp4->resetTracks();
} }
#endif #endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 //拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls; auto hls = _hls;
if (hls) { if (hls) {
hls->resetTracks(); hls->resetTracks();
} }
auto mp4 = _mp4; auto mp4 = _mp4;
if (mp4) { if (mp4) {
mp4->resetTracks(); mp4->resetTracks();
} }
} }
void MultiMuxerPrivate::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) { void MultiMuxerPrivate::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
_listener = listener; _listener = listener;
if (_rtmp) { if (_rtmp) {
_rtmp->setListener(listener); _rtmp->setListener(listener);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->setListener(listener); _rtsp->setListener(listener);
} }
if (_ts) { if (_ts) {
_ts->setListener(listener); _ts->setListener(listener);
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->setListener(listener); _fmp4->setListener(listener);
} }
#endif #endif
auto hls = _hls; auto hls = _hls;
if (hls) { if (hls) {
hls->setListener(listener); hls->setListener(listener);
} }
} }
int MultiMuxerPrivate::totalReaderCount() const { int MultiMuxerPrivate::totalReaderCount() const {
auto hls = _hls; auto hls = _hls;
return (_rtsp ? _rtsp->readerCount() : 0) + return (_rtsp ? _rtsp->readerCount() : 0) +
(_rtmp ? _rtmp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) +
(_ts ? _ts->readerCount() : 0) + (_ts ? _ts->readerCount() : 0) +
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->readerCount() : 0) + (_fmp4 ? _fmp4->readerCount() : 0) +
#endif #endif
(hls ? hls->readerCount() : 0); (hls ? hls->readerCount() : 0);
} }
static std::shared_ptr<MediaSinkInterface> makeRecorder(const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){ static std::shared_ptr<MediaSinkInterface> makeRecorder(const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path); auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path);
for (auto &track : tracks) { for (auto &track : tracks) {
recorder->addTrack(track); recorder->addTrack(track);
} }
return recorder; return recorder;
} }
//此函数可能跨线程调用 //此函数可能跨线程调用
bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path){ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path){
switch (type) { switch (type) {
case Recorder::type_hls : { case Recorder::type_hls : {
if (start && !_hls) { if (start && !_hls) {
//开始录制 //开始录制
auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(getTracks(true), type, custom_path, sender)); auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(getTracks(true), type, custom_path, sender));
if (hls) { if (hls) {
//设置HlsMediaSource的事件监听器 //设置HlsMediaSource的事件监听器
hls->setListener(_listener); hls->setListener(_listener);
} }
_hls = hls; _hls = hls;
} else if (!start && _hls) { } else if (!start && _hls) {
//停止录制 //停止录制
_hls = nullptr; _hls = nullptr;
} }
return true; return true;
} }
case Recorder::type_mp4 : { case Recorder::type_mp4 : {
if (start && !_mp4) { if (start && !_mp4) {
//开始录制 //开始录制
_mp4 = makeRecorder(getTracks(true), type, custom_path, sender); _mp4 = makeRecorder(getTracks(true), type, custom_path, sender);
} else if (!start && _mp4) { } else if (!start && _mp4) {
//停止录制 //停止录制
_mp4 = nullptr; _mp4 = nullptr;
} }
return true; return true;
} }
default : return false; default : return false;
} }
} }
//此函数可能跨线程调用 //此函数可能跨线程调用
bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){ bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){
switch (type){ switch (type){
case Recorder::type_hls : case Recorder::type_hls :
return _hls ? true : false; return _hls ? true : false;
case Recorder::type_mp4 : case Recorder::type_mp4 :
return _mp4 ? true : false; return _mp4 ? true : false;
default: default:
return false; return false;
} }
} }
void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) { void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) {
if (_rtmp) { if (_rtmp) {
_rtmp->setTimeStamp(stamp); _rtmp->setTimeStamp(stamp);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->setTimeStamp(stamp); _rtsp->setTimeStamp(stamp);
} }
} }
void MultiMuxerPrivate::setTrackListener(Listener *listener) { void MultiMuxerPrivate::setTrackListener(Listener *listener) {
_track_listener = listener; _track_listener = listener;
} }
void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
if (_rtmp) { if (_rtmp) {
_rtmp->addTrack(track); _rtmp->addTrack(track);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->addTrack(track); _rtsp->addTrack(track);
} }
if (_ts) { if (_ts) {
_ts->addTrack(track); _ts->addTrack(track);
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->addTrack(track); _fmp4->addTrack(track);
} }
#endif #endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 //拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls; auto hls = _hls;
if (hls) { if (hls) {
hls->addTrack(track); hls->addTrack(track);
} }
auto mp4 = _mp4; auto mp4 = _mp4;
if (mp4) { if (mp4) {
mp4->addTrack(track); mp4->addTrack(track);
} }
} }
bool MultiMuxerPrivate::isEnabled(){ bool MultiMuxerPrivate::isEnabled(){
auto hls = _hls; auto hls = _hls;
return (_rtmp ? _rtmp->isEnabled() : false) || return (_rtmp ? _rtmp->isEnabled() : false) ||
(_rtsp ? _rtsp->isEnabled() : false) || (_rtsp ? _rtsp->isEnabled() : false) ||
(_ts ? _ts->isEnabled() : false) || (_ts ? _ts->isEnabled() : false) ||
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->isEnabled() : false) || (_fmp4 ? _fmp4->isEnabled() : false) ||
#endif #endif
(hls ? hls->isEnabled() : false) || _mp4; (hls ? hls->isEnabled() : false) || _mp4;
} }
void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) {
if (_rtmp) { if (_rtmp) {
_rtmp->inputFrame(frame); _rtmp->inputFrame(frame);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->inputFrame(frame); _rtsp->inputFrame(frame);
} }
if (_ts) { if (_ts) {
_ts->inputFrame(frame); _ts->inputFrame(frame);
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->inputFrame(frame); _fmp4->inputFrame(frame);
} }
#endif #endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 //拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
//此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 //此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优
auto hls = _hls; auto hls = _hls;
if (hls) { if (hls) {
hls->inputFrame(frame); hls->inputFrame(frame);
} }
auto mp4 = _mp4; auto mp4 = _mp4;
if (mp4) { if (mp4) {
mp4->inputFrame(frame); mp4->inputFrame(frame);
} }
} }
static string getTrackInfoStr(const TrackSource *track_src){ static string getTrackInfoStr(const TrackSource *track_src){
_StrPrinter codec_info; _StrPrinter codec_info;
auto tracks = track_src->getTracks(true); auto tracks = track_src->getTracks(true);
for (auto &track : tracks) { for (auto &track : tracks) {
auto codec_type = track->getTrackType(); auto codec_type = track->getTrackType();
codec_info << track->getCodecName(); codec_info << track->getCodecName();
switch (codec_type) { switch (codec_type) {
case TrackAudio : { case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track); auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "[" codec_info << "["
<< audio_track->getAudioSampleRate() << "/" << audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/" << audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] "; << audio_track->getAudioSampleBit() << "] ";
break; break;
} }
case TrackVideo : { case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track); auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "[" codec_info << "["
<< video_track->getVideoWidth() << "/" << video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/" << video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] "; << round(video_track->getVideoFps()) << "] ";
break; break;
} }
default: default:
break; break;
} }
} }
return codec_info; return codec_info;
} }
void MultiMuxerPrivate::onAllTrackReady() { void MultiMuxerPrivate::onAllTrackReady() {
if (_rtmp) { if (_rtmp) {
_rtmp->onAllTrackReady(); _rtmp->onAllTrackReady();
} }
if (_rtsp) { if (_rtsp) {
_rtsp->onAllTrackReady(); _rtsp->onAllTrackReady();
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->onAllTrackReady(); _fmp4->onAllTrackReady();
} }
#endif #endif
if (_track_listener) { if (_track_listener) {
_track_listener->onAllTrackReady(); _track_listener->onAllTrackReady();
} }
InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this); InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
} }
///////////////////////////////MultiMediaSourceMuxer////////////////////////////////// ///////////////////////////////MultiMediaSourceMuxer//////////////////////////////////
MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {} MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec, MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4)); _muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4));
_muxer->setTrackListener(this); _muxer->setTrackListener(this);
} }
void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) { void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener); setDelegate(listener);
//拦截事件 //拦截事件
_muxer->setMediaListener(shared_from_this()); _muxer->setMediaListener(shared_from_this());
} }
void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener) { void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener) {
_track_listener = listener; _track_listener = listener;
} }
int MultiMediaSourceMuxer::totalReaderCount() const { int MultiMediaSourceMuxer::totalReaderCount() const {
return _muxer->totalReaderCount(); return _muxer->totalReaderCount();
} }
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) { void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
_muxer->setTimeStamp(stamp); _muxer->setTimeStamp(stamp);
} }
vector<Track::Ptr> MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const { vector<Track::Ptr> MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const {
return _muxer->getTracks(trackReady); return _muxer->getTracks(trackReady);
} }
int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
auto listener = getDelegate(); auto listener = getDelegate();
if (!listener) { if (!listener) {
return totalReaderCount(); return totalReaderCount();
} }
return listener->totalReaderCount(sender); return listener->totalReaderCount(sender);
} }
bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
return _muxer->setupRecord(sender, type, start, custom_path); return _muxer->setupRecord(sender, type, start, custom_path);
} }
bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) { bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) {
return _muxer->isRecording(sender,type); return _muxer->isRecording(sender,type);
} }
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb){
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data())); RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data()));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this(); weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) { rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](const SockException &ex) {
cb(ex); cb(ex);
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self || ex) { if (!strong_self || ex) {
return; return;
} }
for (auto &track : strong_self->_muxer->getTracks(false)) { for (auto &track : strong_self->_muxer->getTracks(false)) {
rtp_sender->addTrack(track); rtp_sender->addTrack(track);
} }
rtp_sender->addTrackCompleted(); rtp_sender->addTrackCompleted();
strong_self->_rtp_sender = rtp_sender; strong_self->_rtp_sender[ssrc] = rtp_sender;
}); });
#else #else
cb(SockException(Err_other, "该功能未启用编译时请打开ENABLE_RTPPROXY宏")); cb(SockException(Err_other, "该功能未启用编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY #endif//ENABLE_RTPPROXY
} }
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
if (_rtp_sender) { map<string, RtpSender::Ptr>::iterator ite = _rtp_sender.find(ssrc);
_rtp_sender = nullptr; if (ite != _rtp_sender.end())
return true; {
} ite->second = nullptr;
#endif//ENABLE_RTPPROXY _rtp_sender.erase(ite);
return false; return true;
} }
#endif//ENABLE_RTPPROXY
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { return false;
_muxer->addTrack(track); }
}
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
void MultiMediaSourceMuxer::addTrackCompleted() { _muxer->addTrack(track);
_muxer->addTrackCompleted(); }
}
void MultiMediaSourceMuxer::addTrackCompleted() {
void MultiMediaSourceMuxer::onAllTrackReady(){ _muxer->addTrackCompleted();
_muxer->setMediaListener(shared_from_this()); }
auto listener = _track_listener.lock();
if(listener){ void MultiMediaSourceMuxer::onAllTrackReady(){
listener->onAllTrackReady(); _muxer->setMediaListener(shared_from_this());
} auto listener = _track_listener.lock();
} if(listener){
listener->onAllTrackReady();
void MultiMediaSourceMuxer::resetTracks() { }
_muxer->resetTracks(); }
}
void MultiMediaSourceMuxer::resetTracks() {
//该类实现frame级别的时间戳覆盖 _muxer->resetTracks();
class FrameModifyStamp : public Frame{ }
public:
typedef std::shared_ptr<FrameModifyStamp> Ptr; //该类实现frame级别的时间戳覆盖
FrameModifyStamp(const Frame::Ptr &frame, Stamp &stamp){ class FrameModifyStamp : public Frame{
_frame = frame; public:
//覆盖时间戳 typedef std::shared_ptr<FrameModifyStamp> Ptr;
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, true); FrameModifyStamp(const Frame::Ptr &frame, Stamp &stamp){
} _frame = frame;
~FrameModifyStamp() override {} //覆盖时间戳
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, true);
uint32_t dts() const override{ }
return _dts; ~FrameModifyStamp() override {}
}
uint32_t dts() const override{
uint32_t pts() const override{ return _dts;
return _pts; }
}
uint32_t pts() const override{
uint32_t prefixSize() const override { return _pts;
return _frame->prefixSize(); }
}
uint32_t prefixSize() const override {
bool keyFrame() const override { return _frame->prefixSize();
return _frame->keyFrame(); }
}
bool keyFrame() const override {
bool configFrame() const override { return _frame->keyFrame();
return _frame->configFrame(); }
}
bool configFrame() const override {
bool cacheAble() const override { return _frame->configFrame();
return _frame->cacheAble(); }
}
bool cacheAble() const override {
char *data() const override { return _frame->cacheAble();
return _frame->data(); }
}
char *data() const override {
uint32_t size() const override { return _frame->data();
return _frame->size(); }
}
uint32_t size() const override {
CodecId getCodecId() const override { return _frame->size();
return _frame->getCodecId(); }
}
private: CodecId getCodecId() const override {
int64_t _dts; return _frame->getCodecId();
int64_t _pts; }
Frame::Ptr _frame; private:
}; int64_t _dts;
int64_t _pts;
void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { Frame::Ptr _frame;
GET_CONFIG(bool, modify_stamp, General::kModifyStamp); };
auto frame = frame_in;
if (modify_stamp) { void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
//开启了时间戳覆盖 GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]); auto frame = frame_in;
} if (modify_stamp) {
_muxer->inputFrame(frame); //开启了时间戳覆盖
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
#if defined(ENABLE_RTPPROXY) }
auto rtp_sender = _rtp_sender; _muxer->inputFrame(frame);
if (rtp_sender) {
rtp_sender->inputFrame(frame); #if defined(ENABLE_RTPPROXY)
} map<string, RtpSender::Ptr>::iterator ite = _rtp_sender.begin();
#endif //ENABLE_RTPPROXY while (ite != _rtp_sender.end())
{
} if (ite->second)
{
bool MultiMediaSourceMuxer::isEnabled(){ ite->second->inputFrame(frame);
GET_CONFIG(uint32_t, stream_none_reader_delay_ms, General::kStreamNoneReaderDelayMS); }
if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) { ite++;
//无人观看时,每次检查是否真的无人观看 }
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) #endif //ENABLE_RTPPROXY
#if defined(ENABLE_RTPPROXY)
_is_enable = (_muxer->isEnabled() || _rtp_sender); }
#else
_is_enable = _muxer->isEnabled(); bool MultiMediaSourceMuxer::isEnabled(){
#endif //ENABLE_RTPPROXY GET_CONFIG(uint32_t, stream_none_reader_delay_ms, General::kStreamNoneReaderDelayMS);
if (_is_enable) { if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) {
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍所以刷新计数器无意义且浪费cpu //无人观看时,每次检查是否真的无人观看
_last_check.resetTime(); //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
} #if defined(ENABLE_RTPPROXY)
} _is_enable = (_muxer->isEnabled() || _rtp_sender.size());
return _is_enable; #else
} _is_enable = _muxer->isEnabled();
#endif //ENABLE_RTPPROXY
if (_is_enable) {
}//namespace mediakit //无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍所以刷新计数器无意义且浪费cpu
_last_check.resetTime();
}
}
return _is_enable;
}
}//namespace mediakit

View File

@ -1,197 +1,197 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#ifndef ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H #ifndef ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H #define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#include "Common/Stamp.h" #include "Common/Stamp.h"
#include "Rtp/RtpSender.h" #include "Rtp/RtpSender.h"
#include "Record/Recorder.h" #include "Record/Recorder.h"
#include "Record/HlsRecorder.h" #include "Record/HlsRecorder.h"
#include "Record/HlsMediaSource.h" #include "Record/HlsMediaSource.h"
#include "Rtsp/RtspMediaSourceMuxer.h" #include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h" #include "Rtmp/RtmpMediaSourceMuxer.h"
#include "TS/TSMediaSourceMuxer.h" #include "TS/TSMediaSourceMuxer.h"
#include "FMP4/FMP4MediaSourceMuxer.h" #include "FMP4/FMP4MediaSourceMuxer.h"
namespace mediakit{ namespace mediakit{
class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{ class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{
public: public:
friend class MultiMediaSourceMuxer; friend class MultiMediaSourceMuxer;
typedef std::shared_ptr<MultiMuxerPrivate> Ptr; typedef std::shared_ptr<MultiMuxerPrivate> Ptr;
class Listener{ class Listener{
public: public:
Listener() = default; Listener() = default;
virtual ~Listener() = default; virtual ~Listener() = default;
virtual void onAllTrackReady() = 0; virtual void onAllTrackReady() = 0;
}; };
~MultiMuxerPrivate() override; ~MultiMuxerPrivate() override;
private: private:
MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec, MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4); bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4);
void resetTracks() override; void resetTracks() override;
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener); void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
int totalReaderCount() const; int totalReaderCount() const;
void setTimeStamp(uint32_t stamp); void setTimeStamp(uint32_t stamp);
void setTrackListener(Listener *listener); void setTrackListener(Listener *listener);
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path); bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path);
bool isRecording(MediaSource &sender, Recorder::type type); bool isRecording(MediaSource &sender, Recorder::type type);
bool isEnabled(); bool isEnabled();
void onTrackReady(const Track::Ptr & track) override; void onTrackReady(const Track::Ptr & track) override;
void onTrackFrame(const Frame::Ptr &frame) override; void onTrackFrame(const Frame::Ptr &frame) override;
void onAllTrackReady() override; void onAllTrackReady() override;
private: private:
string _stream_url; string _stream_url;
Listener *_track_listener = nullptr; Listener *_track_listener = nullptr;
RtmpMediaSourceMuxer::Ptr _rtmp; RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp; RtspMediaSourceMuxer::Ptr _rtsp;
HlsRecorder::Ptr _hls; HlsRecorder::Ptr _hls;
MediaSinkInterface::Ptr _mp4; MediaSinkInterface::Ptr _mp4;
TSMediaSourceMuxer::Ptr _ts; TSMediaSourceMuxer::Ptr _ts;
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4; FMP4MediaSourceMuxer::Ptr _fmp4;
#endif #endif
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
}; };
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this<MultiMediaSourceMuxer>{ class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public: public:
typedef MultiMuxerPrivate::Listener Listener; typedef MultiMuxerPrivate::Listener Listener;
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr; typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
~MultiMediaSourceMuxer() override; ~MultiMediaSourceMuxer() override;
MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0, MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0,
bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false); bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false);
/** /**
* *
* @param listener * @param listener
*/ */
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener); void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
/** /**
* Track就绪事件监听器 * Track就绪事件监听器
* @param listener * @param listener
*/ */
void setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener); void setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener);
/** /**
* *
*/ */
int totalReaderCount() const; int totalReaderCount() const;
/** /**
* () * ()
*/ */
bool isEnabled(); bool isEnabled();
/** /**
* MediaSource时间戳 * MediaSource时间戳
* @param stamp * @param stamp
*/ */
void setTimeStamp(uint32_t stamp); void setTimeStamp(uint32_t stamp);
/////////////////////////////////MediaSourceEvent override///////////////////////////////// /////////////////////////////////MediaSourceEvent override/////////////////////////////////
/** /**
* Track * Track
* @param trackReady track * @param trackReady track
* @return Track * @return Track
*/ */
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override; vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
/** /**
* *
* @param sender * @param sender
* @return * @return
*/ */
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
/** /**
* *
* @param type * @param type
* @param start * @param start
* @param custom_path * @param custom_path
* @return * @return
*/ */
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
/** /**
* *
* @param type * @param type
* @return * @return
*/ */
bool isRecording(MediaSource &sender, Recorder::type type) override; bool isRecording(MediaSource &sender, Recorder::type type) override;
/** /**
* ps-rtp流 * ps-rtp流
* @param dst_url ip或域名 * @param dst_url ip或域名
* @param dst_port * @param dst_port
* @param ssrc rtp的ssrc * @param ssrc rtp的ssrc
* @param is_udp udp * @param is_udp udp
* @param cb * @param cb
*/ */
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override; void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) override;
/** /**
* ps-rtp发送 * ps-rtp发送
* @return * @return
*/ */
bool stopSendRtp(MediaSource &sender) override; bool stopSendRtp(MediaSource &sender, const string &ssrc) override;
/////////////////////////////////MediaSinkInterface override///////////////////////////////// /////////////////////////////////MediaSinkInterface override/////////////////////////////////
/** /**
* trackTrack的clone方法 * trackTrack的clone方法
* sps pps这些信息 Delegate相关关系 * sps pps这些信息 Delegate相关关系
* @param track * @param track
*/ */
void addTrack(const Track::Ptr &track) override; void addTrack(const Track::Ptr &track) override;
/** /**
* track完毕 * track完毕
*/ */
void addTrackCompleted() override; void addTrackCompleted() override;
/** /**
* track * track
*/ */
void resetTracks() override; void resetTracks() override;
/** /**
* *
* @param frame * @param frame
*/ */
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
/////////////////////////////////MultiMuxerPrivate::Listener override///////////////////////////////// /////////////////////////////////MultiMuxerPrivate::Listener override/////////////////////////////////
/** /**
* track全部就绪 * track全部就绪
*/ */
void onAllTrackReady() override; void onAllTrackReady() override;
private: private:
bool _is_enable = false; bool _is_enable = false;
Ticker _last_check; Ticker _last_check;
Stamp _stamp[2]; Stamp _stamp[2];
MultiMuxerPrivate::Ptr _muxer; MultiMuxerPrivate::Ptr _muxer;
std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener; std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
RtpSender::Ptr _rtp_sender; map<string, RtpSender::Ptr> _rtp_sender;
#endif //ENABLE_RTPPROXY #endif //ENABLE_RTPPROXY
}; };
}//namespace mediakit }//namespace mediakit
#endif //ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H #endif //ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H

View File

@ -1,164 +1,165 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "RtpSender.h" #include "RtpSender.h"
#include "Rtsp/RtspSession.h" #include "Rtsp/RtspSession.h"
#include "Thread/WorkThreadPool.h" #include "Thread/WorkThreadPool.h"
#include "RtpCache.h" #include "RtpCache.h"
namespace mediakit{ namespace mediakit{
RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) {
_poller = EventPollerPool::Instance().getPoller(); _poller = EventPollerPool::Instance().getPoller();
_interface = std::make_shared<RtpCachePS>([this](std::shared_ptr<List<Buffer::Ptr> > list) { _interface = std::make_shared<RtpCachePS>([this](std::shared_ptr<List<Buffer::Ptr> > list) {
onFlushRtpList(std::move(list)); onFlushRtpList(std::move(list));
}, ssrc, payload_type); }, ssrc, payload_type);
} }
RtpSender::~RtpSender() { RtpSender::~RtpSender() {
} }
void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb){ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb){
_is_udp = is_udp; _is_udp = is_udp;
_socket = Socket::createSocket(_poller, false); _socket = Socket::createSocket(_poller, false);
_dst_url = dst_url; _dst_url = dst_url;
_dst_port = dst_port; _dst_port = dst_port;
weak_ptr<RtpSender> weak_self = shared_from_this(); _src_port = src_port;
if (is_udp) { weak_ptr<RtpSender> weak_self = shared_from_this();
_socket->bindUdpSock(0); if (is_udp) {
auto poller = _poller; _socket->bindUdpSock(src_port);
WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() { auto poller = _poller;
struct sockaddr addr; WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() {
//切换线程目的是为了dns解析放在后台线程执行 struct sockaddr addr;
if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) { //切换线程目的是为了dns解析放在后台线程执行
poller->async([dst_url, cb]() { if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) {
//切回自己的线程 poller->async([dst_url, cb]() {
cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); //切回自己的线程
}); cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url));
return; });
} return;
}
//dns解析成功
poller->async([addr, weak_self, cb]() { //dns解析成功
//切回自己的线程 poller->async([addr, weak_self, cb]() {
cb(SockException()); //切回自己的线程
auto strong_self = weak_self.lock(); cb(SockException());
if (strong_self) { auto strong_self = weak_self.lock();
strong_self->_socket->setSendPeerAddr(&addr); if (strong_self) {
strong_self->onConnect(); strong_self->_socket->setSendPeerAddr(&addr);
} strong_self->onConnect();
}); }
}); });
} else { });
_socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) { } else {
cb(err); _socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) {
auto strong_self = weak_self.lock(); cb(err);
if (strong_self && !err) { auto strong_self = weak_self.lock();
//tcp连接成功 if (strong_self && !err) {
strong_self->onConnect(); //tcp连接成功
} strong_self->onConnect();
}); }
} }, 5.0F, "0.0.0.0", src_port);
} }
}
void RtpSender::onConnect(){
_is_connect = true; void RtpSender::onConnect(){
//加大发送缓存,防止udp丢包之类的问题 _is_connect = true;
SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); //加大发送缓存,防止udp丢包之类的问题
if (!_is_udp) { SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024);
//关闭tcp no_delay并开启MSG_MORE, 提高发送性能 if (!_is_udp) {
SockUtil::setNoDelay(_socket->rawFD(), false); //关闭tcp no_delay并开启MSG_MORE, 提高发送性能
_socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); SockUtil::setNoDelay(_socket->rawFD(), false);
} _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
//连接建立成功事件 }
weak_ptr<RtpSender> weak_self = shared_from_this(); //连接建立成功事件
_socket->setOnErr([weak_self](const SockException &err) { weak_ptr<RtpSender> weak_self = shared_from_this();
auto strong_self = weak_self.lock(); _socket->setOnErr([weak_self](const SockException &err) {
if (strong_self) { auto strong_self = weak_self.lock();
strong_self->onErr(err); if (strong_self) {
} strong_self->onErr(err);
}); }
InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; });
} InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp;
}
void RtpSender::addTrack(const Track::Ptr &track){
_interface->addTrack(track); void RtpSender::addTrack(const Track::Ptr &track){
} _interface->addTrack(track);
}
void RtpSender::addTrackCompleted(){
_interface->addTrackCompleted(); void RtpSender::addTrackCompleted(){
} _interface->addTrackCompleted();
}
void RtpSender::resetTracks(){
_interface->resetTracks(); void RtpSender::resetTracks(){
} _interface->resetTracks();
}
//此函数在其他线程执行
void RtpSender::inputFrame(const Frame::Ptr &frame) { //此函数在其他线程执行
if (_is_connect) { void RtpSender::inputFrame(const Frame::Ptr &frame) {
//连接成功后才做实质操作(节省cpu资源) if (_is_connect) {
_interface->inputFrame(frame); //连接成功后才做实质操作(节省cpu资源)
} _interface->inputFrame(frame);
} }
}
//此函数在其他线程执行
void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) { //此函数在其他线程执行
if(!_is_connect){ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
//连接成功后才能发送数据 if(!_is_connect){
return; //连接成功后才能发送数据
} return;
}
auto is_udp = _is_udp;
auto socket = _socket; auto is_udp = _is_udp;
_poller->async([rtp_list, is_udp, socket]() { auto socket = _socket;
int i = 0; _poller->async([rtp_list, is_udp, socket]() {
int size = rtp_list->size(); int i = 0;
rtp_list->for_each([&](Buffer::Ptr &packet) { int size = rtp_list->size();
if (is_udp) { rtp_list->for_each([&](Buffer::Ptr &packet) {
//udp模式rtp over tcp前4个字节可以忽略 if (is_udp) {
socket->send(std::make_shared<BufferRtp>(std::move(packet), 4), nullptr, 0, ++i == size); //udp模式rtp over tcp前4个字节可以忽略
} else { socket->send(std::make_shared<BufferRtp>(std::move(packet), 4), nullptr, 0, ++i == size);
//tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 } else {
socket->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size); //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
} socket->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
}); }
}); });
} });
}
void RtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false; void RtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false;
//监听socket断开事件方便重连
if (is_connect) { //监听socket断开事件方便重连
WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what(); if (is_connect) {
} else { WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what();
WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); } else {
} WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what();
}
weak_ptr<RtpSender> weak_self = shared_from_this();
_connect_timer = std::make_shared<Timer>(10.0, [weak_self]() { weak_ptr<RtpSender> weak_self = shared_from_this();
auto strong_self = weak_self.lock(); _connect_timer = std::make_shared<Timer>(10.0, [weak_self]() {
if (!strong_self) { auto strong_self = weak_self.lock();
return false; if (!strong_self) {
} return false;
strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){ }
auto strong_self = weak_self.lock(); strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](const SockException &ex){
if (strong_self && ex) { auto strong_self = weak_self.lock();
//连接失败且本对象未销毁,那么重试连接 if (strong_self && ex) {
strong_self->onErr(ex, true); //连接失败且本对象未销毁,那么重试连接
} strong_self->onErr(ex, true);
}); }
return false; });
}, _poller); return false;
} }, _poller);
}
}//namespace mediakit
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY) #endif// defined(ENABLE_RTPPROXY)

View File

@ -1,85 +1,86 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#ifndef ZLMEDIAKIT_RTPSENDER_H #ifndef ZLMEDIAKIT_RTPSENDER_H
#define ZLMEDIAKIT_RTPSENDER_H #define ZLMEDIAKIT_RTPSENDER_H
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h" #include "PSEncoder.h"
#include "Extension/CommonRtp.h" #include "Extension/CommonRtp.h"
namespace mediakit{ namespace mediakit{
//rtp发送客户端支持发送GB28181协议 //rtp发送客户端支持发送GB28181协议
class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{
public: public:
typedef std::shared_ptr<RtpSender> Ptr; typedef std::shared_ptr<RtpSender> Ptr;
~RtpSender() override; ~RtpSender() override;
/** /**
* GB28181 RTP发送客户端 * GB28181 RTP发送客户端
* @param ssrc rtp的ssrc * @param ssrc rtp的ssrc
* @param payload_type ps-rtp的pt一般为96 * @param payload_type ps-rtp的pt一般为96
*/ */
RtpSender(uint32_t ssrc, uint8_t payload_type = 96); RtpSender(uint32_t ssrc, uint8_t payload_type = 96);
/** /**
* ps-rtp包 * ps-rtp包
* @param dst_url ip或域名 * @param dst_url ip或域名
* @param dst_port * @param dst_port
* @param is_udp udp方式发送rtp * @param is_udp udp方式发送rtp
* @param cb * @param cb
*/ */
void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb); void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb);
/** /**
* *
*/ */
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
/** /**
* trackTrack的clone方法 * trackTrack的clone方法
* sps pps这些信息 Delegate相关关系 * sps pps这些信息 Delegate相关关系
* @param track * @param track
*/ */
virtual void addTrack(const Track::Ptr & track) override; virtual void addTrack(const Track::Ptr & track) override;
/** /**
* Track完毕 * Track完毕
*/ */
virtual void addTrackCompleted() override; virtual void addTrackCompleted() override;
/** /**
* track * track
*/ */
virtual void resetTracks() override; virtual void resetTracks() override;
private: private:
//合并写输出 //合并写输出
void onFlushRtpList(std::shared_ptr<List<Buffer::Ptr> > rtp_list); void onFlushRtpList(std::shared_ptr<List<Buffer::Ptr> > rtp_list);
//udp/tcp连接成功回调 //udp/tcp连接成功回调
void onConnect(); void onConnect();
//异常断开socket事件 //异常断开socket事件
void onErr(const SockException &ex, bool is_connect = false); void onErr(const SockException &ex, bool is_connect = false);
private: private:
bool _is_udp; bool _is_udp;
bool _is_connect = false; bool _is_connect = false;
string _dst_url; string _dst_url;
uint16_t _dst_port; uint16_t _dst_port;
Socket::Ptr _socket; uint16_t _src_port;
EventPoller::Ptr _poller; Socket::Ptr _socket;
Timer::Ptr _connect_timer; EventPoller::Ptr _poller;
MediaSinkInterface::Ptr _interface; Timer::Ptr _connect_timer;
}; MediaSinkInterface::Ptr _interface;
};
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY) }//namespace mediakit
#endif //ZLMEDIAKIT_RTPSENDER_H #endif// defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_RTPSENDER_H

View File

@ -1,94 +1,102 @@
/* /*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
* *
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
* *
* Use of this source code is governed by MIT license that can be found in the * Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors * LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "RtpServer.h" #include "RtpServer.h"
#include "RtpSelector.h" #include "RtpSelector.h"
namespace mediakit{ namespace mediakit{
RtpServer::RtpServer() { RtpServer::RtpServer() {
} }
RtpServer::~RtpServer() { RtpServer::~RtpServer() {
if(_on_clearup){ if(_on_clearup){
_on_clearup(); _on_clearup();
} }
} }
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
//创建udp服务器 //创建udp服务器
Socket::Ptr udp_server = Socket::createSocket(nullptr, false); Socket::Ptr udp_server = Socket::createSocket(nullptr, false);
if (local_port == 0) { if (local_port == 0) {
//随机端口rtp端口采用偶数 //随机端口rtp端口采用偶数
Socket::Ptr rtcp_server = Socket::createSocket(nullptr, false); Socket::Ptr rtcp_server = Socket::createSocket(nullptr, false);
auto pair = std::make_pair(udp_server, rtcp_server); auto pair = std::make_pair(udp_server, rtcp_server);
makeSockPair(pair, local_ip); makeSockPair(pair, local_ip);
//取偶数端口 //取偶数端口
udp_server = pair.first; udp_server = pair.first;
} else if (!udp_server->bindUdpSock(local_port, local_ip)) { } else if (!udp_server->bindUdpSock(local_port, local_ip)) {
//用户指定端口 //用户指定端口
throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true));
} }
//设置udp socket读缓存 //设置udp socket读缓存
SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024); SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024);
TcpServer::Ptr tcp_server; TcpServer::Ptr tcp_server;
if (enable_tcp) { if (enable_tcp) {
//创建tcp服务器 //创建tcp服务器
tcp_server = std::make_shared<TcpServer>(udp_server->getPoller()); tcp_server = std::make_shared<TcpServer>(udp_server->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kStreamID] = stream_id;
tcp_server->start<RtpSession>(udp_server->get_local_port(), local_ip); tcp_server->start<RtpSession>(udp_server->get_local_port(), local_ip);
} }
RtpProcess::Ptr process; RtpProcess::Ptr process;
if (!stream_id.empty()) { if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
process = RtpSelector::Instance().getProcess(stream_id, true); process = RtpSelector::Instance().getProcess(stream_id, true);
udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { //udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
process->inputRtp(true, udp_server, buf->data(), buf->size(), addr); // process->inputRtp(true, udp_server, buf->data(), buf->size(), addr);
}); //});
} else { weak_ptr<Socket> weak_sock = udp_server;
//未指定流id一个端口多个流通过ssrc来分流 udp_server->setOnRead([weak_sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
auto &ref = RtpSelector::Instance(); process->inputRtp(true, weak_sock.lock(), buf->data(), buf->size(), addr);
udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) { });
ref.inputRtp(udp_server, buf->data(), buf->size(), addr); } else {
}); //未指定流id一个端口多个流通过ssrc来分流
} auto &ref = RtpSelector::Instance();
//udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
_on_clearup = [udp_server, process, stream_id]() { // ref.inputRtp(udp_server, buf->data(), buf->size(), addr);
//去除循环引用 //});
udp_server->setOnRead(nullptr); weak_ptr<Socket> weak_sock = udp_server;
if (process) { udp_server->setOnRead([&ref, weak_sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
//删除rtp处理器 ref.inputRtp(weak_sock.lock(), buf->data(), buf->size(), addr);
RtpSelector::Instance().delProcess(stream_id, process.get()); });
} }
};
_on_clearup = [udp_server, process, stream_id]() {
_tcp_server = tcp_server; //去除循环引用
_udp_server = udp_server; //udp_server->setOnRead(nullptr);
_rtp_process = process; if (process) {
} //删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
void RtpServer::setOnDetach(const function<void()> &cb){ }
if(_rtp_process){ };
_rtp_process->setOnDetach(cb);
} _tcp_server = tcp_server;
} _udp_server = udp_server;
_rtp_process = process;
EventPoller::Ptr RtpServer::getPoller() { }
return _udp_server->getPoller();
} void RtpServer::setOnDetach(const function<void()> &cb){
if(_rtp_process){
uint16_t RtpServer::getPort() { _rtp_process->setOnDetach(cb);
return _udp_server ? _udp_server->get_local_port() : 0; }
} }
}//namespace mediakit EventPoller::Ptr RtpServer::getPoller() {
return _udp_server->getPoller();
}
uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->get_local_port() : 0;
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)