增加多路RTP视频流输出

This commit is contained in:
hewenyuan 2020-11-27 17:19:55 +08:00
parent f7433b0f90
commit 50927548e9
9 changed files with 3449 additions and 3431 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,170 +1,170 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "MediaSink.h"
//最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track
#define MAX_WAIT_MS_READY 10000
//如果添加Track最多等待5秒
#define MAX_WAIT_MS_ADD_TRACK 5000
namespace mediakit{
void MediaSink::addTrack(const Track::Ptr &track_in) {
lock_guard<recursive_mutex> lck(_mtx);
if (_all_track_ready) {
WarnL << "all track is ready, add this track too late!";
return;
}
//克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone();
auto codec_id = track->getCodecId();
_track_map[codec_id] = track;
_track_ready_callback[codec_id] = [this, track]() {
onTrackReady(track);
};
_ticker.resetTime();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if (_all_track_ready) {
onTrackFrame(frame);
} else {
//还有Track未就绪先缓存之
_frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame));
}
}));
}
void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx);
_all_track_ready = false;
_track_map.clear();
_track_ready_callback.clear();
_ticker.resetTime();
_max_track_size = 2;
_frame_unread.clear();
}
void MediaSink::inputFrame(const Frame::Ptr &frame) {
lock_guard<recursive_mutex> lck(_mtx);
auto it = _track_map.find(frame->getCodecId());
if (it == _track_map.end()) {
return;
}
it->second->inputFrame(frame);
checkTrackIfReady(nullptr);
}
void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
//Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
auto it_callback = _track_ready_callback.find(track->getCodecId());
if (it_callback != _track_ready_callback.end() && track->ready()) {
it_callback->second();
_track_ready_callback.erase(it_callback);
}
}
void MediaSink::checkTrackIfReady(const Track::Ptr &track){
if (!_all_track_ready && !_track_ready_callback.empty()) {
if (track) {
checkTrackIfReady_l(track);
} else {
for (auto &pr : _track_map) {
checkTrackIfReady_l(pr.second);
}
}
}
if(!_all_track_ready){
if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
//如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady();
return;
}
if(!_track_ready_callback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待
return;
}
if(_track_map.size() == _max_track_size){
//如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
emitAllTrackReady();
return;
}
if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
//如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady();
return;
}
}
}
void MediaSink::addTrackCompleted(){
lock_guard<recursive_mutex> lck(_mtx);
_max_track_size = _track_map.size();
checkTrackIfReady(nullptr);
}
void MediaSink::emitAllTrackReady() {
if (_all_track_ready) {
return;
}
DebugL << "all track ready use " << _ticker.elapsedTime() << "ms";
if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track
_track_ready_callback.clear();
//移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) {
WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName();
it = _track_map.erase(it);
continue;
}
++it;
}
}
if (!_track_map.empty()) {
//最少有一个有效的Track
_all_track_ready = true;
onAllTrackReady();
//全部Track就绪我们一次性把之前的帧输出
for(auto &pr : _frame_unread){
if (_track_map.find(pr.first) == _track_map.end()) {
//该Track已经被移除
continue;
}
pr.second.for_each([&](const Frame::Ptr &frame) {
onTrackFrame(frame);
});
}
_frame_unread.clear();
}
}
vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{
vector<Track::Ptr> ret;
lock_guard<recursive_mutex> lck(_mtx);
for (auto &pr : _track_map){
if(trackReady && !pr.second->ready()){
continue;
}
ret.emplace_back(pr.second);
}
return ret;
}
}//namespace mediakit
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "MediaSink.h"
//最多等待未初始化的Track 10秒超时之后会忽略未初始化的Track
#define MAX_WAIT_MS_READY 10000
//如果添加Track最多等待5秒
#define MAX_WAIT_MS_ADD_TRACK 1000
namespace mediakit{
void MediaSink::addTrack(const Track::Ptr &track_in) {
lock_guard<recursive_mutex> lck(_mtx);
if (_all_track_ready) {
WarnL << "all track is ready, add this track too late!";
return;
}
//克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone();
auto codec_id = track->getCodecId();
_track_map[codec_id] = track;
_track_ready_callback[codec_id] = [this, track]() {
onTrackReady(track);
};
_ticker.resetTime();
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if (_all_track_ready) {
onTrackFrame(frame);
} else {
//还有Track未就绪先缓存之
_frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame));
}
}));
}
void MediaSink::resetTracks() {
lock_guard<recursive_mutex> lck(_mtx);
_all_track_ready = false;
_track_map.clear();
_track_ready_callback.clear();
_ticker.resetTime();
_max_track_size = 2;
_frame_unread.clear();
}
void MediaSink::inputFrame(const Frame::Ptr &frame) {
lock_guard<recursive_mutex> lck(_mtx);
auto it = _track_map.find(frame->getCodecId());
if (it == _track_map.end()) {
return;
}
it->second->inputFrame(frame);
checkTrackIfReady(nullptr);
}
void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
//Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
auto it_callback = _track_ready_callback.find(track->getCodecId());
if (it_callback != _track_ready_callback.end() && track->ready()) {
it_callback->second();
_track_ready_callback.erase(it_callback);
}
}
void MediaSink::checkTrackIfReady(const Track::Ptr &track){
if (!_all_track_ready && !_track_ready_callback.empty()) {
if (track) {
checkTrackIfReady_l(track);
} else {
for (auto &pr : _track_map) {
checkTrackIfReady_l(pr.second);
}
}
}
if(!_all_track_ready){
if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
//如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady();
return;
}
if(!_track_ready_callback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待
return;
}
if(_track_map.size() == _max_track_size){
//如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
emitAllTrackReady();
return;
}
if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
//如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady();
return;
}
}
}
void MediaSink::addTrackCompleted(){
lock_guard<recursive_mutex> lck(_mtx);
_max_track_size = _track_map.size();
checkTrackIfReady(nullptr);
}
void MediaSink::emitAllTrackReady() {
if (_all_track_ready) {
return;
}
DebugL << "all track ready use " << _ticker.elapsedTime() << "ms";
if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track
_track_ready_callback.clear();
//移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) {
WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName();
it = _track_map.erase(it);
continue;
}
++it;
}
}
if (!_track_map.empty()) {
//最少有一个有效的Track
_all_track_ready = true;
onAllTrackReady();
//全部Track就绪我们一次性把之前的帧输出
for(auto &pr : _frame_unread){
if (_track_map.find(pr.first) == _track_map.end()) {
//该Track已经被移除
continue;
}
pr.second.for_each([&](const Frame::Ptr &frame) {
onTrackFrame(frame);
});
}
_frame_unread.clear();
}
}
vector<Track::Ptr> MediaSink::getTracks(bool trackReady) const{
vector<Track::Ptr> ret;
lock_guard<recursive_mutex> lck(_mtx);
for (auto &pr : _track_map){
if(trackReady && !pr.second->ready()){
continue;
}
ret.emplace_back(pr.second);
}
return ret;
}
}//namespace mediakit

File diff suppressed because it is too large Load Diff

View File

@ -1,361 +1,361 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H
#include <mutex>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
#include "Common/Parser.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Util/List.h"
#include "Network/Socket.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
using namespace std;
using namespace toolkit;
namespace toolkit{
class TcpSession;
}// namespace toolkit
namespace mediakit {
enum class MediaOriginType : uint8_t {
unknown = 0,
rtmp_push ,
rtsp_push,
rtp_push,
pull,
ffmpeg_pull,
mp4_vod,
device_chn
};
string getOriginTypeString(MediaOriginType type);
class MediaSource;
class MediaSourceEvent{
public:
friend class MediaSource;
MediaSourceEvent(){};
virtual ~MediaSourceEvent(){};
// 获取媒体源类型
virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; }
// 获取媒体源url或者文件路径
virtual string getOriginUrl(MediaSource &sender) const { return ""; }
// 获取媒体源客户端相关信息
virtual std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const { return nullptr; }
// 通知拖动进度条
virtual bool seekTo(MediaSource &sender, uint32_t stamp) { return false; }
// 通知其停止产生流
virtual bool close(MediaSource &sender, bool force) { return false; }
// 获取观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0;
// 通知观看人数变化
virtual void onReaderChanged(MediaSource &sender, int size);
//流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {};
////////////////////////仅供MultiMediaSourceMuxer对象继承////////////////////////
// 开启或关闭录制
virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
// 获取录制状态
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
// 获取所有track相关信息
virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); };
// 开始发送ps-rtp
virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));};
// 停止发送ps-rtp
virtual bool stopSendRtp(MediaSource &sender) {return false; }
private:
Timer::Ptr _async_close_timer;
};
//该对象用于拦截感兴趣的MediaSourceEvent事件
class MediaSourceEventInterceptor : public MediaSourceEvent{
public:
MediaSourceEventInterceptor(){}
~MediaSourceEventInterceptor() override {}
void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener);
std::shared_ptr<MediaSourceEvent> getDelegate() const;
MediaOriginType getOriginType(MediaSource &sender) const override;
string getOriginUrl(MediaSource &sender) const override;
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
bool seekTo(MediaSource &sender, uint32_t stamp) override;
bool close(MediaSource &sender, bool force) override;
int totalReaderCount(MediaSource &sender) override;
void onReaderChanged(MediaSource &sender, int size) override;
void onRegist(MediaSource &sender, bool regist) override;
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
bool isRecording(MediaSource &sender, Recorder::type type) override;
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override;
bool stopSendRtp(MediaSource &sender) override;
private:
std::weak_ptr<MediaSourceEvent> _listener;
};
/**
* url获取媒体相关信息
*/
class MediaInfo{
public:
~MediaInfo() {}
MediaInfo() {}
MediaInfo(const string &url) { parse(url); }
void parse(const string &url);
public:
string _full_url;
string _schema;
string _host;
string _port;
string _vhost;
string _app;
string _streamid;
string _param_strs;
};
class BytesSpeed {
public:
BytesSpeed() = default;
~BytesSpeed() = default;
/**
*
*/
BytesSpeed& operator += (uint64_t bytes) {
_bytes += bytes;
if (_bytes > 1024 * 1024) {
//数据大于1MB就计算一次网速
computeSpeed();
}
return *this;
}
/**
* bytes/s
*/
int getSpeed() {
if (_ticker.elapsedTime() < 1000) {
//获取频率小于1秒那么返回上次计算结果
return _speed;
}
return computeSpeed();
}
private:
uint64_t computeSpeed() {
auto elapsed = _ticker.elapsedTime();
if (!elapsed) {
return _speed;
}
_speed = _bytes * 1000 / elapsed;
_ticker.resetTime();
_bytes = 0;
return _speed;
}
private:
int _speed = 0;
uint64_t _bytes = 0;
Ticker _ticker;
};
/**
* rtsp/rtmp的直播流都源自该对象
*/
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
public:
typedef std::shared_ptr<MediaSource> Ptr;
typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap;
typedef unordered_map<string, StreamMap > AppStreamMap;
typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;
MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ;
virtual ~MediaSource() ;
////////////////获取MediaSource相关信息////////////////
// 获取协议类型
const string& getSchema() const;
// 虚拟主机
const string& getVhost() const;
// 应用名
const string& getApp() const;
// 流id
const string& getId() const;
// 获取所有Track
vector<Track::Ptr> getTracks(bool ready = true) const override;
// 获取流当前时间戳
virtual uint32_t getTimeStamp(TrackType type) { return 0; };
// 设置时间戳
virtual void setTimeStamp(uint32_t stamp) {};
// 获取数据速率单位bytes/s
int getBytesSpeed();
// 获取流创建GMT unix时间戳单位秒
uint64_t getCreateStamp() const;
// 获取流上线时间,单位秒
uint64_t getAliveSecond() const;
////////////////MediaSourceEvent相关接口实现////////////////
// 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取监听者
std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const;
// 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
virtual int readerCount() = 0;
// 观看者个数,包括(hls/rtsp/rtmp)
virtual int totalReaderCount();
// 获取媒体源类型
MediaOriginType getOriginType() const;
// 获取媒体源url或者文件路径
string getOriginUrl() const;
// 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock() const;
// 拖动进度条
bool seekTo(uint32_t stamp);
// 关闭该流
bool close(bool force);
// 该流观看人数变化
void onReaderChanged(int size);
// 开启或关闭录制
bool setupRecord(Recorder::type type, bool start, const string &custom_path);
// 获取录制状态
bool isRecording(Recorder::type type);
// 开始发送ps-rtp
void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb);
// 停止发送ps-rtp
bool stopSendRtp();
////////////////static方法查找或生成MediaSource////////////////
// 同步查找流
static Ptr find(const string &schema, const string &vhost, const string &app, const string &id);
// 忽略类型同步查找流可能返回rtmp/rtsp/hls类型
static Ptr find(const string &vhost, const string &app, const string &stream_id);
// 异步查找流
static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
// 遍历所有流
static void for_each_media(const function<void(const Ptr &src)> &cb);
// 从mp4文件生成MediaSource
static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path = "", bool check_app = true);
protected:
//媒体注册
void regist();
private:
//媒体注销
bool unregist();
//触发媒体事件
void emitEvent(bool regist);
protected:
BytesSpeed _speed;
private:
time_t _create_stamp;
Ticker _ticker;
string _schema;
string _vhost;
string _app;
string _stream_id;
std::weak_ptr<MediaSourceEvent> _listener;
};
///缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size);
private:
uint64_t _last_stamp[2] = {0, 0};
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache(){
_cache = std::make_shared<packet_list>();
}
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
} /* namespace mediakit */
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H
#include <mutex>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
#include "Common/Parser.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Util/List.h"
#include "Network/Socket.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
using namespace std;
using namespace toolkit;
namespace toolkit{
class TcpSession;
}// namespace toolkit
namespace mediakit {
enum class MediaOriginType : uint8_t {
unknown = 0,
rtmp_push ,
rtsp_push,
rtp_push,
pull,
ffmpeg_pull,
mp4_vod,
device_chn
};
string getOriginTypeString(MediaOriginType type);
class MediaSource;
class MediaSourceEvent{
public:
friend class MediaSource;
MediaSourceEvent(){};
virtual ~MediaSourceEvent(){};
// 获取媒体源类型
virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; }
// 获取媒体源url或者文件路径
virtual string getOriginUrl(MediaSource &sender) const { return ""; }
// 获取媒体源客户端相关信息
virtual std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const { return nullptr; }
// 通知拖动进度条
virtual bool seekTo(MediaSource &sender, uint32_t stamp) { return false; }
// 通知其停止产生流
virtual bool close(MediaSource &sender, bool force) { return false; }
// 获取观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0;
// 通知观看人数变化
virtual void onReaderChanged(MediaSource &sender, int size);
//流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {};
////////////////////////仅供MultiMediaSourceMuxer对象继承////////////////////////
// 开启或关闭录制
virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
// 获取录制状态
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
// 获取所有track相关信息
virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); };
// 开始发送ps-rtp
virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));};
// 停止发送ps-rtp
virtual bool stopSendRtp(MediaSource &sender, const string &ssrc) {return false; }
private:
Timer::Ptr _async_close_timer;
};
//该对象用于拦截感兴趣的MediaSourceEvent事件
class MediaSourceEventInterceptor : public MediaSourceEvent{
public:
MediaSourceEventInterceptor(){}
~MediaSourceEventInterceptor() override {}
void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener);
std::shared_ptr<MediaSourceEvent> getDelegate() const;
MediaOriginType getOriginType(MediaSource &sender) const override;
string getOriginUrl(MediaSource &sender) const override;
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
bool seekTo(MediaSource &sender, uint32_t stamp) override;
bool close(MediaSource &sender, bool force) override;
int totalReaderCount(MediaSource &sender) override;
void onReaderChanged(MediaSource &sender, int size) override;
void onRegist(MediaSource &sender, bool regist) override;
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
bool isRecording(MediaSource &sender, Recorder::type type) override;
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) override;
bool stopSendRtp(MediaSource &sender, const string &ssrc) override;
private:
std::weak_ptr<MediaSourceEvent> _listener;
};
/**
* url获取媒体相关信息
*/
class MediaInfo{
public:
~MediaInfo() {}
MediaInfo() {}
MediaInfo(const string &url) { parse(url); }
void parse(const string &url);
public:
string _full_url;
string _schema;
string _host;
string _port;
string _vhost;
string _app;
string _streamid;
string _param_strs;
};
class BytesSpeed {
public:
BytesSpeed() = default;
~BytesSpeed() = default;
/**
*
*/
BytesSpeed& operator += (uint64_t bytes) {
_bytes += bytes;
if (_bytes > 1024 * 1024) {
//数据大于1MB就计算一次网速
computeSpeed();
}
return *this;
}
/**
* bytes/s
*/
int getSpeed() {
if (_ticker.elapsedTime() < 1000) {
//获取频率小于1秒那么返回上次计算结果
return _speed;
}
return computeSpeed();
}
private:
uint64_t computeSpeed() {
auto elapsed = _ticker.elapsedTime();
if (!elapsed) {
return _speed;
}
_speed = _bytes * 1000 / elapsed;
_ticker.resetTime();
_bytes = 0;
return _speed;
}
private:
int _speed = 0;
uint64_t _bytes = 0;
Ticker _ticker;
};
/**
* rtsp/rtmp的直播流都源自该对象
*/
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
public:
typedef std::shared_ptr<MediaSource> Ptr;
typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap;
typedef unordered_map<string, StreamMap > AppStreamMap;
typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;
MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ;
virtual ~MediaSource() ;
////////////////获取MediaSource相关信息////////////////
// 获取协议类型
const string& getSchema() const;
// 虚拟主机
const string& getVhost() const;
// 应用名
const string& getApp() const;
// 流id
const string& getId() const;
// 获取所有Track
vector<Track::Ptr> getTracks(bool ready = true) const override;
// 获取流当前时间戳
virtual uint32_t getTimeStamp(TrackType type) { return 0; };
// 设置时间戳
virtual void setTimeStamp(uint32_t stamp) {};
// 获取数据速率单位bytes/s
int getBytesSpeed();
// 获取流创建GMT unix时间戳单位秒
uint64_t getCreateStamp() const;
// 获取流上线时间,单位秒
uint64_t getAliveSecond() const;
////////////////MediaSourceEvent相关接口实现////////////////
// 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取监听者
std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const;
// 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
virtual int readerCount() = 0;
// 观看者个数,包括(hls/rtsp/rtmp)
virtual int totalReaderCount();
// 获取媒体源类型
MediaOriginType getOriginType() const;
// 获取媒体源url或者文件路径
string getOriginUrl() const;
// 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock() const;
// 拖动进度条
bool seekTo(uint32_t stamp);
// 关闭该流
bool close(bool force);
// 该流观看人数变化
void onReaderChanged(int size);
// 开启或关闭录制
bool setupRecord(Recorder::type type, bool start, const string &custom_path);
// 获取录制状态
bool isRecording(Recorder::type type);
// 开始发送ps-rtp
void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb);
// 停止发送ps-rtp
bool stopSendRtp(const string &ssrc);
////////////////static方法查找或生成MediaSource////////////////
// 同步查找流
static Ptr find(const string &schema, const string &vhost, const string &app, const string &id);
// 忽略类型同步查找流可能返回rtmp/rtsp/hls类型
static Ptr find(const string &vhost, const string &app, const string &stream_id);
// 异步查找流
static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
// 遍历所有流
static void for_each_media(const function<void(const Ptr &src)> &cb);
// 从mp4文件生成MediaSource
static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path = "", bool check_app = true);
protected:
//媒体注册
void regist();
private:
//媒体注销
bool unregist();
//触发媒体事件
void emitEvent(bool regist);
protected:
BytesSpeed _speed;
private:
time_t _create_stamp;
Ticker _ticker;
string _schema;
string _vhost;
string _app;
string _stream_id;
std::weak_ptr<MediaSourceEvent> _listener;
};
///缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size);
private:
uint64_t _last_stamp[2] = {0, 0};
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache(){
_cache = std::make_shared<packet_list>();
}
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
} /* namespace mediakit */
#endif //ZLMEDIAKIT_MEDIASOURCE_H

View File

@ -1,472 +1,480 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <math.h>
#include "Common/config.h"
#include "MultiMediaSourceMuxer.h"
namespace mediakit {
///////////////////////////////MultiMuxerPrivate//////////////////////////////////
MultiMuxerPrivate::~MultiMuxerPrivate() {}
MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_stream_url = vhost + " " + app + " " + stream;
if (enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec));
}
if (enable_rtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleSdp>(dur_sec));
}
if (enable_hls) {
_hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream));
}
if (enable_mp4) {
_mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream);
}
_ts = std::make_shared<TSMediaSourceMuxer>(vhost, app, stream);
#if defined(ENABLE_MP4)
_fmp4 = std::make_shared<FMP4MediaSourceMuxer>(vhost, app, stream);
#endif
}
void MultiMuxerPrivate::resetTracks() {
if (_rtmp) {
_rtmp->resetTracks();
}
if (_rtsp) {
_rtsp->resetTracks();
}
if (_ts) {
_ts->resetTracks();
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->resetTracks();
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls;
if (hls) {
hls->resetTracks();
}
auto mp4 = _mp4;
if (mp4) {
mp4->resetTracks();
}
}
void MultiMuxerPrivate::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
_listener = listener;
if (_rtmp) {
_rtmp->setListener(listener);
}
if (_rtsp) {
_rtsp->setListener(listener);
}
if (_ts) {
_ts->setListener(listener);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->setListener(listener);
}
#endif
auto hls = _hls;
if (hls) {
hls->setListener(listener);
}
}
int MultiMuxerPrivate::totalReaderCount() const {
auto hls = _hls;
return (_rtsp ? _rtsp->readerCount() : 0) +
(_rtmp ? _rtmp->readerCount() : 0) +
(_ts ? _ts->readerCount() : 0) +
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->readerCount() : 0) +
#endif
(hls ? hls->readerCount() : 0);
}
static std::shared_ptr<MediaSinkInterface> makeRecorder(const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path);
for (auto &track : tracks) {
recorder->addTrack(track);
}
return recorder;
}
//此函数可能跨线程调用
bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path){
switch (type) {
case Recorder::type_hls : {
if (start && !_hls) {
//开始录制
auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(getTracks(true), type, custom_path, sender));
if (hls) {
//设置HlsMediaSource的事件监听器
hls->setListener(_listener);
}
_hls = hls;
} else if (!start && _hls) {
//停止录制
_hls = nullptr;
}
return true;
}
case Recorder::type_mp4 : {
if (start && !_mp4) {
//开始录制
_mp4 = makeRecorder(getTracks(true), type, custom_path, sender);
} else if (!start && _mp4) {
//停止录制
_mp4 = nullptr;
}
return true;
}
default : return false;
}
}
//此函数可能跨线程调用
bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){
switch (type){
case Recorder::type_hls :
return _hls ? true : false;
case Recorder::type_mp4 :
return _mp4 ? true : false;
default:
return false;
}
}
void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) {
if (_rtmp) {
_rtmp->setTimeStamp(stamp);
}
if (_rtsp) {
_rtsp->setTimeStamp(stamp);
}
}
void MultiMuxerPrivate::setTrackListener(Listener *listener) {
_track_listener = listener;
}
void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
if (_rtmp) {
_rtmp->addTrack(track);
}
if (_rtsp) {
_rtsp->addTrack(track);
}
if (_ts) {
_ts->addTrack(track);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->addTrack(track);
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls;
if (hls) {
hls->addTrack(track);
}
auto mp4 = _mp4;
if (mp4) {
mp4->addTrack(track);
}
}
bool MultiMuxerPrivate::isEnabled(){
auto hls = _hls;
return (_rtmp ? _rtmp->isEnabled() : false) ||
(_rtsp ? _rtsp->isEnabled() : false) ||
(_ts ? _ts->isEnabled() : false) ||
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->isEnabled() : false) ||
#endif
(hls ? hls->isEnabled() : false) || _mp4;
}
void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) {
if (_rtmp) {
_rtmp->inputFrame(frame);
}
if (_rtsp) {
_rtsp->inputFrame(frame);
}
if (_ts) {
_ts->inputFrame(frame);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->inputFrame(frame);
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
//此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优
auto hls = _hls;
if (hls) {
hls->inputFrame(frame);
}
auto mp4 = _mp4;
if (mp4) {
mp4->inputFrame(frame);
}
}
static string getTrackInfoStr(const TrackSource *track_src){
_StrPrinter codec_info;
auto tracks = track_src->getTracks(true);
for (auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
break;
}
default:
break;
}
}
return codec_info;
}
void MultiMuxerPrivate::onAllTrackReady() {
if (_rtmp) {
_rtmp->onAllTrackReady();
}
if (_rtsp) {
_rtsp->onAllTrackReady();
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->onAllTrackReady();
}
#endif
if (_track_listener) {
_track_listener->onAllTrackReady();
}
InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
}
///////////////////////////////MultiMediaSourceMuxer//////////////////////////////////
MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4));
_muxer->setTrackListener(this);
}
void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener);
//拦截事件
_muxer->setMediaListener(shared_from_this());
}
void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener) {
_track_listener = listener;
}
int MultiMediaSourceMuxer::totalReaderCount() const {
return _muxer->totalReaderCount();
}
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
_muxer->setTimeStamp(stamp);
}
vector<Track::Ptr> MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const {
return _muxer->getTracks(trackReady);
}
int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
auto listener = getDelegate();
if (!listener) {
return totalReaderCount();
}
return listener->totalReaderCount(sender);
}
bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
return _muxer->setupRecord(sender, type, start, custom_path);
}
bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) {
return _muxer->isRecording(sender,type);
}
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
#if defined(ENABLE_RTPPROXY)
RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data()));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) {
cb(ex);
auto strong_self = weak_self.lock();
if (!strong_self || ex) {
return;
}
for (auto &track : strong_self->_muxer->getTracks(false)) {
rtp_sender->addTrack(track);
}
rtp_sender->addTrackCompleted();
strong_self->_rtp_sender = rtp_sender;
});
#else
cb(SockException(Err_other, "该功能未启用编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){
#if defined(ENABLE_RTPPROXY)
if (_rtp_sender) {
_rtp_sender = nullptr;
return true;
}
#endif//ENABLE_RTPPROXY
return false;
}
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
_muxer->addTrack(track);
}
void MultiMediaSourceMuxer::addTrackCompleted() {
_muxer->addTrackCompleted();
}
void MultiMediaSourceMuxer::onAllTrackReady(){
_muxer->setMediaListener(shared_from_this());
auto listener = _track_listener.lock();
if(listener){
listener->onAllTrackReady();
}
}
void MultiMediaSourceMuxer::resetTracks() {
_muxer->resetTracks();
}
//该类实现frame级别的时间戳覆盖
class FrameModifyStamp : public Frame{
public:
typedef std::shared_ptr<FrameModifyStamp> Ptr;
FrameModifyStamp(const Frame::Ptr &frame, Stamp &stamp){
_frame = frame;
//覆盖时间戳
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, true);
}
~FrameModifyStamp() override {}
uint32_t dts() const override{
return _dts;
}
uint32_t pts() const override{
return _pts;
}
uint32_t prefixSize() const override {
return _frame->prefixSize();
}
bool keyFrame() const override {
return _frame->keyFrame();
}
bool configFrame() const override {
return _frame->configFrame();
}
bool cacheAble() const override {
return _frame->cacheAble();
}
char *data() const override {
return _frame->data();
}
uint32_t size() const override {
return _frame->size();
}
CodecId getCodecId() const override {
return _frame->getCodecId();
}
private:
int64_t _dts;
int64_t _pts;
Frame::Ptr _frame;
};
void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
auto frame = frame_in;
if (modify_stamp) {
//开启了时间戳覆盖
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
}
_muxer->inputFrame(frame);
#if defined(ENABLE_RTPPROXY)
auto rtp_sender = _rtp_sender;
if (rtp_sender) {
rtp_sender->inputFrame(frame);
}
#endif //ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::isEnabled(){
GET_CONFIG(uint32_t, stream_none_reader_delay_ms, General::kStreamNoneReaderDelayMS);
if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) {
//无人观看时,每次检查是否真的无人观看
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
#if defined(ENABLE_RTPPROXY)
_is_enable = (_muxer->isEnabled() || _rtp_sender);
#else
_is_enable = _muxer->isEnabled();
#endif //ENABLE_RTPPROXY
if (_is_enable) {
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍所以刷新计数器无意义且浪费cpu
_last_check.resetTime();
}
}
return _is_enable;
}
}//namespace mediakit
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <math.h>
#include "Common/config.h"
#include "MultiMediaSourceMuxer.h"
namespace mediakit {
///////////////////////////////MultiMuxerPrivate//////////////////////////////////
MultiMuxerPrivate::~MultiMuxerPrivate() {}
MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_stream_url = vhost + " " + app + " " + stream;
if (enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec));
}
if (enable_rtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleSdp>(dur_sec));
}
if (enable_hls) {
_hls = dynamic_pointer_cast<HlsRecorder>(Recorder::createRecorder(Recorder::type_hls, vhost, app, stream));
}
if (enable_mp4) {
_mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream);
}
_ts = std::make_shared<TSMediaSourceMuxer>(vhost, app, stream);
#if defined(ENABLE_MP4)
_fmp4 = std::make_shared<FMP4MediaSourceMuxer>(vhost, app, stream);
#endif
}
void MultiMuxerPrivate::resetTracks() {
if (_rtmp) {
_rtmp->resetTracks();
}
if (_rtsp) {
_rtsp->resetTracks();
}
if (_ts) {
_ts->resetTracks();
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->resetTracks();
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls;
if (hls) {
hls->resetTracks();
}
auto mp4 = _mp4;
if (mp4) {
mp4->resetTracks();
}
}
void MultiMuxerPrivate::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
_listener = listener;
if (_rtmp) {
_rtmp->setListener(listener);
}
if (_rtsp) {
_rtsp->setListener(listener);
}
if (_ts) {
_ts->setListener(listener);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->setListener(listener);
}
#endif
auto hls = _hls;
if (hls) {
hls->setListener(listener);
}
}
int MultiMuxerPrivate::totalReaderCount() const {
auto hls = _hls;
return (_rtsp ? _rtsp->readerCount() : 0) +
(_rtmp ? _rtmp->readerCount() : 0) +
(_ts ? _ts->readerCount() : 0) +
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->readerCount() : 0) +
#endif
(hls ? hls->readerCount() : 0);
}
static std::shared_ptr<MediaSinkInterface> makeRecorder(const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path);
for (auto &track : tracks) {
recorder->addTrack(track);
}
return recorder;
}
//此函数可能跨线程调用
bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path){
switch (type) {
case Recorder::type_hls : {
if (start && !_hls) {
//开始录制
auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(getTracks(true), type, custom_path, sender));
if (hls) {
//设置HlsMediaSource的事件监听器
hls->setListener(_listener);
}
_hls = hls;
} else if (!start && _hls) {
//停止录制
_hls = nullptr;
}
return true;
}
case Recorder::type_mp4 : {
if (start && !_mp4) {
//开始录制
_mp4 = makeRecorder(getTracks(true), type, custom_path, sender);
} else if (!start && _mp4) {
//停止录制
_mp4 = nullptr;
}
return true;
}
default : return false;
}
}
//此函数可能跨线程调用
bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){
switch (type){
case Recorder::type_hls :
return _hls ? true : false;
case Recorder::type_mp4 :
return _mp4 ? true : false;
default:
return false;
}
}
void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) {
if (_rtmp) {
_rtmp->setTimeStamp(stamp);
}
if (_rtsp) {
_rtsp->setTimeStamp(stamp);
}
}
void MultiMuxerPrivate::setTrackListener(Listener *listener) {
_track_listener = listener;
}
void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
if (_rtmp) {
_rtmp->addTrack(track);
}
if (_rtsp) {
_rtsp->addTrack(track);
}
if (_ts) {
_ts->addTrack(track);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->addTrack(track);
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls;
if (hls) {
hls->addTrack(track);
}
auto mp4 = _mp4;
if (mp4) {
mp4->addTrack(track);
}
}
bool MultiMuxerPrivate::isEnabled(){
auto hls = _hls;
return (_rtmp ? _rtmp->isEnabled() : false) ||
(_rtsp ? _rtsp->isEnabled() : false) ||
(_ts ? _ts->isEnabled() : false) ||
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->isEnabled() : false) ||
#endif
(hls ? hls->isEnabled() : false) || _mp4;
}
void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) {
if (_rtmp) {
_rtmp->inputFrame(frame);
}
if (_rtsp) {
_rtsp->inputFrame(frame);
}
if (_ts) {
_ts->inputFrame(frame);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->inputFrame(frame);
}
#endif
//拷贝智能指针目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
//此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优
auto hls = _hls;
if (hls) {
hls->inputFrame(frame);
}
auto mp4 = _mp4;
if (mp4) {
mp4->inputFrame(frame);
}
}
static string getTrackInfoStr(const TrackSource *track_src){
_StrPrinter codec_info;
auto tracks = track_src->getTracks(true);
for (auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
break;
}
default:
break;
}
}
return codec_info;
}
void MultiMuxerPrivate::onAllTrackReady() {
if (_rtmp) {
_rtmp->onAllTrackReady();
}
if (_rtsp) {
_rtsp->onAllTrackReady();
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->onAllTrackReady();
}
#endif
if (_track_listener) {
_track_listener->onAllTrackReady();
}
InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
}
///////////////////////////////MultiMediaSourceMuxer//////////////////////////////////
MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4));
_muxer->setTrackListener(this);
}
void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener);
//拦截事件
_muxer->setMediaListener(shared_from_this());
}
void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener) {
_track_listener = listener;
}
int MultiMediaSourceMuxer::totalReaderCount() const {
return _muxer->totalReaderCount();
}
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
_muxer->setTimeStamp(stamp);
}
vector<Track::Ptr> MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const {
return _muxer->getTracks(trackReady);
}
int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
auto listener = getDelegate();
if (!listener) {
return totalReaderCount();
}
return listener->totalReaderCount(sender);
}
bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
return _muxer->setupRecord(sender, type, start, custom_path);
}
bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) {
return _muxer->isRecording(sender,type);
}
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb){
#if defined(ENABLE_RTPPROXY)
RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data()));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](const SockException &ex) {
cb(ex);
auto strong_self = weak_self.lock();
if (!strong_self || ex) {
return;
}
for (auto &track : strong_self->_muxer->getTracks(false)) {
rtp_sender->addTrack(track);
}
rtp_sender->addTrackCompleted();
strong_self->_rtp_sender[ssrc] = rtp_sender;
});
#else
cb(SockException(Err_other, "该功能未启用编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){
#if defined(ENABLE_RTPPROXY)
map<string, RtpSender::Ptr>::iterator ite = _rtp_sender.find(ssrc);
if (ite != _rtp_sender.end())
{
ite->second = nullptr;
_rtp_sender.erase(ite);
return true;
}
#endif//ENABLE_RTPPROXY
return false;
}
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
_muxer->addTrack(track);
}
void MultiMediaSourceMuxer::addTrackCompleted() {
_muxer->addTrackCompleted();
}
void MultiMediaSourceMuxer::onAllTrackReady(){
_muxer->setMediaListener(shared_from_this());
auto listener = _track_listener.lock();
if(listener){
listener->onAllTrackReady();
}
}
void MultiMediaSourceMuxer::resetTracks() {
_muxer->resetTracks();
}
//该类实现frame级别的时间戳覆盖
class FrameModifyStamp : public Frame{
public:
typedef std::shared_ptr<FrameModifyStamp> Ptr;
FrameModifyStamp(const Frame::Ptr &frame, Stamp &stamp){
_frame = frame;
//覆盖时间戳
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, true);
}
~FrameModifyStamp() override {}
uint32_t dts() const override{
return _dts;
}
uint32_t pts() const override{
return _pts;
}
uint32_t prefixSize() const override {
return _frame->prefixSize();
}
bool keyFrame() const override {
return _frame->keyFrame();
}
bool configFrame() const override {
return _frame->configFrame();
}
bool cacheAble() const override {
return _frame->cacheAble();
}
char *data() const override {
return _frame->data();
}
uint32_t size() const override {
return _frame->size();
}
CodecId getCodecId() const override {
return _frame->getCodecId();
}
private:
int64_t _dts;
int64_t _pts;
Frame::Ptr _frame;
};
void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
auto frame = frame_in;
if (modify_stamp) {
//开启了时间戳覆盖
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
}
_muxer->inputFrame(frame);
#if defined(ENABLE_RTPPROXY)
map<string, RtpSender::Ptr>::iterator ite = _rtp_sender.begin();
while (ite != _rtp_sender.end())
{
if (ite->second)
{
ite->second->inputFrame(frame);
}
ite++;
}
#endif //ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::isEnabled(){
GET_CONFIG(uint32_t, stream_none_reader_delay_ms, General::kStreamNoneReaderDelayMS);
if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) {
//无人观看时,每次检查是否真的无人观看
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
#if defined(ENABLE_RTPPROXY)
_is_enable = (_muxer->isEnabled() || _rtp_sender.size());
#else
_is_enable = _muxer->isEnabled();
#endif //ENABLE_RTPPROXY
if (_is_enable) {
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍所以刷新计数器无意义且浪费cpu
_last_check.resetTime();
}
}
return _is_enable;
}
}//namespace mediakit

View File

@ -1,197 +1,197 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#include "Common/Stamp.h"
#include "Rtp/RtpSender.h"
#include "Record/Recorder.h"
#include "Record/HlsRecorder.h"
#include "Record/HlsMediaSource.h"
#include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "TS/TSMediaSourceMuxer.h"
#include "FMP4/FMP4MediaSourceMuxer.h"
namespace mediakit{
class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{
public:
friend class MultiMediaSourceMuxer;
typedef std::shared_ptr<MultiMuxerPrivate> Ptr;
class Listener{
public:
Listener() = default;
virtual ~Listener() = default;
virtual void onAllTrackReady() = 0;
};
~MultiMuxerPrivate() override;
private:
MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4);
void resetTracks() override;
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
int totalReaderCount() const;
void setTimeStamp(uint32_t stamp);
void setTrackListener(Listener *listener);
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path);
bool isRecording(MediaSource &sender, Recorder::type type);
bool isEnabled();
void onTrackReady(const Track::Ptr & track) override;
void onTrackFrame(const Frame::Ptr &frame) override;
void onAllTrackReady() override;
private:
string _stream_url;
Listener *_track_listener = nullptr;
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
HlsRecorder::Ptr _hls;
MediaSinkInterface::Ptr _mp4;
TSMediaSourceMuxer::Ptr _ts;
#if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4;
#endif
std::weak_ptr<MediaSourceEvent> _listener;
};
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
typedef MultiMuxerPrivate::Listener Listener;
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
~MultiMediaSourceMuxer() override;
MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0,
bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false);
/**
*
* @param listener
*/
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
/**
* Track就绪事件监听器
* @param listener
*/
void setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener);
/**
*
*/
int totalReaderCount() const;
/**
* ()
*/
bool isEnabled();
/**
* MediaSource时间戳
* @param stamp
*/
void setTimeStamp(uint32_t stamp);
/////////////////////////////////MediaSourceEvent override/////////////////////////////////
/**
* Track
* @param trackReady track
* @return Track
*/
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
/**
*
* @param sender
* @return
*/
int totalReaderCount(MediaSource &sender) override;
/**
*
* @param type
* @param start
* @param custom_path
* @return
*/
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
/**
*
* @param type
* @return
*/
bool isRecording(MediaSource &sender, Recorder::type type) override;
/**
* ps-rtp流
* @param dst_url ip或域名
* @param dst_port
* @param ssrc rtp的ssrc
* @param is_udp udp
* @param cb
*/
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override;
/**
* ps-rtp发送
* @return
*/
bool stopSendRtp(MediaSource &sender) override;
/////////////////////////////////MediaSinkInterface override/////////////////////////////////
/**
* trackTrack的clone方法
* sps pps这些信息 Delegate相关关系
* @param track
*/
void addTrack(const Track::Ptr &track) override;
/**
* track完毕
*/
void addTrackCompleted() override;
/**
* track
*/
void resetTracks() override;
/**
*
* @param frame
*/
void inputFrame(const Frame::Ptr &frame) override;
/////////////////////////////////MultiMuxerPrivate::Listener override/////////////////////////////////
/**
* track全部就绪
*/
void onAllTrackReady() override;
private:
bool _is_enable = false;
Ticker _last_check;
Stamp _stamp[2];
MultiMuxerPrivate::Ptr _muxer;
std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener;
#if defined(ENABLE_RTPPROXY)
RtpSender::Ptr _rtp_sender;
#endif //ENABLE_RTPPROXY
};
}//namespace mediakit
#endif //ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#include "Common/Stamp.h"
#include "Rtp/RtpSender.h"
#include "Record/Recorder.h"
#include "Record/HlsRecorder.h"
#include "Record/HlsMediaSource.h"
#include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "TS/TSMediaSourceMuxer.h"
#include "FMP4/FMP4MediaSourceMuxer.h"
namespace mediakit{
class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{
public:
friend class MultiMediaSourceMuxer;
typedef std::shared_ptr<MultiMuxerPrivate> Ptr;
class Listener{
public:
Listener() = default;
virtual ~Listener() = default;
virtual void onAllTrackReady() = 0;
};
~MultiMuxerPrivate() override;
private:
MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4);
void resetTracks() override;
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
int totalReaderCount() const;
void setTimeStamp(uint32_t stamp);
void setTrackListener(Listener *listener);
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path);
bool isRecording(MediaSource &sender, Recorder::type type);
bool isEnabled();
void onTrackReady(const Track::Ptr & track) override;
void onTrackFrame(const Frame::Ptr &frame) override;
void onAllTrackReady() override;
private:
string _stream_url;
Listener *_track_listener = nullptr;
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
HlsRecorder::Ptr _hls;
MediaSinkInterface::Ptr _mp4;
TSMediaSourceMuxer::Ptr _ts;
#if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4;
#endif
std::weak_ptr<MediaSourceEvent> _listener;
};
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
typedef MultiMuxerPrivate::Listener Listener;
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
~MultiMediaSourceMuxer() override;
MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0,
bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false);
/**
*
* @param listener
*/
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
/**
* Track就绪事件监听器
* @param listener
*/
void setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener);
/**
*
*/
int totalReaderCount() const;
/**
* ()
*/
bool isEnabled();
/**
* MediaSource时间戳
* @param stamp
*/
void setTimeStamp(uint32_t stamp);
/////////////////////////////////MediaSourceEvent override/////////////////////////////////
/**
* Track
* @param trackReady track
* @return Track
*/
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
/**
*
* @param sender
* @return
*/
int totalReaderCount(MediaSource &sender) override;
/**
*
* @param type
* @param start
* @param custom_path
* @return
*/
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
/**
*
* @param type
* @return
*/
bool isRecording(MediaSource &sender, Recorder::type type) override;
/**
* ps-rtp流
* @param dst_url ip或域名
* @param dst_port
* @param ssrc rtp的ssrc
* @param is_udp udp
* @param cb
*/
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb) override;
/**
* ps-rtp发送
* @return
*/
bool stopSendRtp(MediaSource &sender, const string &ssrc) override;
/////////////////////////////////MediaSinkInterface override/////////////////////////////////
/**
* trackTrack的clone方法
* sps pps这些信息 Delegate相关关系
* @param track
*/
void addTrack(const Track::Ptr &track) override;
/**
* track完毕
*/
void addTrackCompleted() override;
/**
* track
*/
void resetTracks() override;
/**
*
* @param frame
*/
void inputFrame(const Frame::Ptr &frame) override;
/////////////////////////////////MultiMuxerPrivate::Listener override/////////////////////////////////
/**
* track全部就绪
*/
void onAllTrackReady() override;
private:
bool _is_enable = false;
Ticker _last_check;
Stamp _stamp[2];
MultiMuxerPrivate::Ptr _muxer;
std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener;
#if defined(ENABLE_RTPPROXY)
map<string, RtpSender::Ptr> _rtp_sender;
#endif //ENABLE_RTPPROXY
};
}//namespace mediakit
#endif //ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H

View File

@ -1,164 +1,165 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "RtpSender.h"
#include "Rtsp/RtspSession.h"
#include "Thread/WorkThreadPool.h"
#include "RtpCache.h"
namespace mediakit{
RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) {
_poller = EventPollerPool::Instance().getPoller();
_interface = std::make_shared<RtpCachePS>([this](std::shared_ptr<List<Buffer::Ptr> > list) {
onFlushRtpList(std::move(list));
}, ssrc, payload_type);
}
RtpSender::~RtpSender() {
}
void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb){
_is_udp = is_udp;
_socket = Socket::createSocket(_poller, false);
_dst_url = dst_url;
_dst_port = dst_port;
weak_ptr<RtpSender> weak_self = shared_from_this();
if (is_udp) {
_socket->bindUdpSock(0);
auto poller = _poller;
WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() {
struct sockaddr addr;
//切换线程目的是为了dns解析放在后台线程执行
if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) {
poller->async([dst_url, cb]() {
//切回自己的线程
cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url));
});
return;
}
//dns解析成功
poller->async([addr, weak_self, cb]() {
//切回自己的线程
cb(SockException());
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->_socket->setSendPeerAddr(&addr);
strong_self->onConnect();
}
});
});
} else {
_socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) {
cb(err);
auto strong_self = weak_self.lock();
if (strong_self && !err) {
//tcp连接成功
strong_self->onConnect();
}
});
}
}
void RtpSender::onConnect(){
_is_connect = true;
//加大发送缓存,防止udp丢包之类的问题
SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024);
if (!_is_udp) {
//关闭tcp no_delay并开启MSG_MORE, 提高发送性能
SockUtil::setNoDelay(_socket->rawFD(), false);
_socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
//连接建立成功事件
weak_ptr<RtpSender> weak_self = shared_from_this();
_socket->setOnErr([weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onErr(err);
}
});
InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp;
}
void RtpSender::addTrack(const Track::Ptr &track){
_interface->addTrack(track);
}
void RtpSender::addTrackCompleted(){
_interface->addTrackCompleted();
}
void RtpSender::resetTracks(){
_interface->resetTracks();
}
//此函数在其他线程执行
void RtpSender::inputFrame(const Frame::Ptr &frame) {
if (_is_connect) {
//连接成功后才做实质操作(节省cpu资源)
_interface->inputFrame(frame);
}
}
//此函数在其他线程执行
void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
if(!_is_connect){
//连接成功后才能发送数据
return;
}
auto is_udp = _is_udp;
auto socket = _socket;
_poller->async([rtp_list, is_udp, socket]() {
int i = 0;
int size = rtp_list->size();
rtp_list->for_each([&](Buffer::Ptr &packet) {
if (is_udp) {
//udp模式rtp over tcp前4个字节可以忽略
socket->send(std::make_shared<BufferRtp>(std::move(packet), 4), nullptr, 0, ++i == size);
} else {
//tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
socket->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
}
});
});
}
void RtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false;
//监听socket断开事件方便重连
if (is_connect) {
WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what();
} else {
WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what();
}
weak_ptr<RtpSender> weak_self = shared_from_this();
_connect_timer = std::make_shared<Timer>(10.0, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){
auto strong_self = weak_self.lock();
if (strong_self && ex) {
//连接失败且本对象未销毁,那么重试连接
strong_self->onErr(ex, true);
}
});
return false;
}, _poller);
}
}//namespace mediakit
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "RtpSender.h"
#include "Rtsp/RtspSession.h"
#include "Thread/WorkThreadPool.h"
#include "RtpCache.h"
namespace mediakit{
RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) {
_poller = EventPollerPool::Instance().getPoller();
_interface = std::make_shared<RtpCachePS>([this](std::shared_ptr<List<Buffer::Ptr> > list) {
onFlushRtpList(std::move(list));
}, ssrc, payload_type);
}
RtpSender::~RtpSender() {
}
void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb){
_is_udp = is_udp;
_socket = Socket::createSocket(_poller, false);
_dst_url = dst_url;
_dst_port = dst_port;
_src_port = src_port;
weak_ptr<RtpSender> weak_self = shared_from_this();
if (is_udp) {
_socket->bindUdpSock(src_port);
auto poller = _poller;
WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() {
struct sockaddr addr;
//切换线程目的是为了dns解析放在后台线程执行
if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) {
poller->async([dst_url, cb]() {
//切回自己的线程
cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url));
});
return;
}
//dns解析成功
poller->async([addr, weak_self, cb]() {
//切回自己的线程
cb(SockException());
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->_socket->setSendPeerAddr(&addr);
strong_self->onConnect();
}
});
});
} else {
_socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) {
cb(err);
auto strong_self = weak_self.lock();
if (strong_self && !err) {
//tcp连接成功
strong_self->onConnect();
}
}, 5.0F, "0.0.0.0", src_port);
}
}
void RtpSender::onConnect(){
_is_connect = true;
//加大发送缓存,防止udp丢包之类的问题
SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024);
if (!_is_udp) {
//关闭tcp no_delay并开启MSG_MORE, 提高发送性能
SockUtil::setNoDelay(_socket->rawFD(), false);
_socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
//连接建立成功事件
weak_ptr<RtpSender> weak_self = shared_from_this();
_socket->setOnErr([weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onErr(err);
}
});
InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp;
}
void RtpSender::addTrack(const Track::Ptr &track){
_interface->addTrack(track);
}
void RtpSender::addTrackCompleted(){
_interface->addTrackCompleted();
}
void RtpSender::resetTracks(){
_interface->resetTracks();
}
//此函数在其他线程执行
void RtpSender::inputFrame(const Frame::Ptr &frame) {
if (_is_connect) {
//连接成功后才做实质操作(节省cpu资源)
_interface->inputFrame(frame);
}
}
//此函数在其他线程执行
void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
if(!_is_connect){
//连接成功后才能发送数据
return;
}
auto is_udp = _is_udp;
auto socket = _socket;
_poller->async([rtp_list, is_udp, socket]() {
int i = 0;
int size = rtp_list->size();
rtp_list->for_each([&](Buffer::Ptr &packet) {
if (is_udp) {
//udp模式rtp over tcp前4个字节可以忽略
socket->send(std::make_shared<BufferRtp>(std::move(packet), 4), nullptr, 0, ++i == size);
} else {
//tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
socket->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
}
});
});
}
void RtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false;
//监听socket断开事件方便重连
if (is_connect) {
WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what();
} else {
WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what();
}
weak_ptr<RtpSender> weak_self = shared_from_this();
_connect_timer = std::make_shared<Timer>(10.0, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](const SockException &ex){
auto strong_self = weak_self.lock();
if (strong_self && ex) {
//连接失败且本对象未销毁,那么重试连接
strong_self->onErr(ex, true);
}
});
return false;
}, _poller);
}
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY)

View File

@ -1,85 +1,86 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_RTPSENDER_H
#define ZLMEDIAKIT_RTPSENDER_H
#if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h"
#include "Extension/CommonRtp.h"
namespace mediakit{
//rtp发送客户端支持发送GB28181协议
class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{
public:
typedef std::shared_ptr<RtpSender> Ptr;
~RtpSender() override;
/**
* GB28181 RTP发送客户端
* @param ssrc rtp的ssrc
* @param payload_type ps-rtp的pt一般为96
*/
RtpSender(uint32_t ssrc, uint8_t payload_type = 96);
/**
* ps-rtp包
* @param dst_url ip或域名
* @param dst_port
* @param is_udp udp方式发送rtp
* @param cb
*/
void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb);
/**
*
*/
void inputFrame(const Frame::Ptr &frame) override;
/**
* trackTrack的clone方法
* sps pps这些信息 Delegate相关关系
* @param track
*/
virtual void addTrack(const Track::Ptr & track) override;
/**
* Track完毕
*/
virtual void addTrackCompleted() override;
/**
* track
*/
virtual void resetTracks() override;
private:
//合并写输出
void onFlushRtpList(std::shared_ptr<List<Buffer::Ptr> > rtp_list);
//udp/tcp连接成功回调
void onConnect();
//异常断开socket事件
void onErr(const SockException &ex, bool is_connect = false);
private:
bool _is_udp;
bool _is_connect = false;
string _dst_url;
uint16_t _dst_port;
Socket::Ptr _socket;
EventPoller::Ptr _poller;
Timer::Ptr _connect_timer;
MediaSinkInterface::Ptr _interface;
};
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_RTPSENDER_H
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_RTPSENDER_H
#define ZLMEDIAKIT_RTPSENDER_H
#if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h"
#include "Extension/CommonRtp.h"
namespace mediakit{
//rtp发送客户端支持发送GB28181协议
class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{
public:
typedef std::shared_ptr<RtpSender> Ptr;
~RtpSender() override;
/**
* GB28181 RTP发送客户端
* @param ssrc rtp的ssrc
* @param payload_type ps-rtp的pt一般为96
*/
RtpSender(uint32_t ssrc, uint8_t payload_type = 96);
/**
* ps-rtp包
* @param dst_url ip或域名
* @param dst_port
* @param is_udp udp方式发送rtp
* @param cb
*/
void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function<void(const SockException &ex)> &cb);
/**
*
*/
void inputFrame(const Frame::Ptr &frame) override;
/**
* trackTrack的clone方法
* sps pps这些信息 Delegate相关关系
* @param track
*/
virtual void addTrack(const Track::Ptr & track) override;
/**
* Track完毕
*/
virtual void addTrackCompleted() override;
/**
* track
*/
virtual void resetTracks() override;
private:
//合并写输出
void onFlushRtpList(std::shared_ptr<List<Buffer::Ptr> > rtp_list);
//udp/tcp连接成功回调
void onConnect();
//异常断开socket事件
void onErr(const SockException &ex, bool is_connect = false);
private:
bool _is_udp;
bool _is_connect = false;
string _dst_url;
uint16_t _dst_port;
uint16_t _src_port;
Socket::Ptr _socket;
EventPoller::Ptr _poller;
Timer::Ptr _connect_timer;
MediaSinkInterface::Ptr _interface;
};
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_RTPSENDER_H

View File

@ -1,94 +1,102 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "RtpServer.h"
#include "RtpSelector.h"
namespace mediakit{
RtpServer::RtpServer() {
}
RtpServer::~RtpServer() {
if(_on_clearup){
_on_clearup();
}
}
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
//创建udp服务器
Socket::Ptr udp_server = Socket::createSocket(nullptr, false);
if (local_port == 0) {
//随机端口rtp端口采用偶数
Socket::Ptr rtcp_server = Socket::createSocket(nullptr, false);
auto pair = std::make_pair(udp_server, rtcp_server);
makeSockPair(pair, local_ip);
//取偶数端口
udp_server = pair.first;
} else if (!udp_server->bindUdpSock(local_port, local_ip)) {
//用户指定端口
throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true));
}
//设置udp socket读缓存
SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024);
TcpServer::Ptr tcp_server;
if (enable_tcp) {
//创建tcp服务器
tcp_server = std::make_shared<TcpServer>(udp_server->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id;
tcp_server->start<RtpSession>(udp_server->get_local_port(), local_ip);
}
RtpProcess::Ptr process;
if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
process = RtpSelector::Instance().getProcess(stream_id, true);
udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
process->inputRtp(true, udp_server, buf->data(), buf->size(), addr);
});
} else {
//未指定流id一个端口多个流通过ssrc来分流
auto &ref = RtpSelector::Instance();
udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(udp_server, buf->data(), buf->size(), addr);
});
}
_on_clearup = [udp_server, process, stream_id]() {
//去除循环引用
udp_server->setOnRead(nullptr);
if (process) {
//删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
}
};
_tcp_server = tcp_server;
_udp_server = udp_server;
_rtp_process = process;
}
void RtpServer::setOnDetach(const function<void()> &cb){
if(_rtp_process){
_rtp_process->setOnDetach(cb);
}
}
EventPoller::Ptr RtpServer::getPoller() {
return _udp_server->getPoller();
}
uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->get_local_port() : 0;
}
}//namespace mediakit
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "RtpServer.h"
#include "RtpSelector.h"
namespace mediakit{
RtpServer::RtpServer() {
}
RtpServer::~RtpServer() {
if(_on_clearup){
_on_clearup();
}
}
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
//创建udp服务器
Socket::Ptr udp_server = Socket::createSocket(nullptr, false);
if (local_port == 0) {
//随机端口rtp端口采用偶数
Socket::Ptr rtcp_server = Socket::createSocket(nullptr, false);
auto pair = std::make_pair(udp_server, rtcp_server);
makeSockPair(pair, local_ip);
//取偶数端口
udp_server = pair.first;
} else if (!udp_server->bindUdpSock(local_port, local_ip)) {
//用户指定端口
throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true));
}
//设置udp socket读缓存
SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024);
TcpServer::Ptr tcp_server;
if (enable_tcp) {
//创建tcp服务器
tcp_server = std::make_shared<TcpServer>(udp_server->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id;
tcp_server->start<RtpSession>(udp_server->get_local_port(), local_ip);
}
RtpProcess::Ptr process;
if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
process = RtpSelector::Instance().getProcess(stream_id, true);
//udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
// process->inputRtp(true, udp_server, buf->data(), buf->size(), addr);
//});
weak_ptr<Socket> weak_sock = udp_server;
udp_server->setOnRead([weak_sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
process->inputRtp(true, weak_sock.lock(), buf->data(), buf->size(), addr);
});
} else {
//未指定流id一个端口多个流通过ssrc来分流
auto &ref = RtpSelector::Instance();
//udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
// ref.inputRtp(udp_server, buf->data(), buf->size(), addr);
//});
weak_ptr<Socket> weak_sock = udp_server;
udp_server->setOnRead([&ref, weak_sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(weak_sock.lock(), buf->data(), buf->size(), addr);
});
}
_on_clearup = [udp_server, process, stream_id]() {
//去除循环引用
//udp_server->setOnRead(nullptr);
if (process) {
//删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
}
};
_tcp_server = tcp_server;
_udp_server = udp_server;
_rtp_process = process;
}
void RtpServer::setOnDetach(const function<void()> &cb){
if(_rtp_process){
_rtp_process->setOnDetach(cb);
}
}
EventPoller::Ptr RtpServer::getPoller() {
return _udp_server->getPoller();
}
uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->get_local_port() : 0;
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)