Merge pull request #1 from zlmediakit/master

update
This commit is contained in:
baiyfcu 2019-07-26 13:36:30 +08:00 committed by GitHub
commit 2d16e39445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 672 additions and 299 deletions

@ -1 +1 @@
Subproject commit c3acf2bd7fff96651da4f77d47e7e0aeb728e5d0
Subproject commit fe572323b10d72819a4d69b326dd70e73c7bf1a6

@ -1 +1 @@
Subproject commit 0e726dd4e06ab4ed3723deaf3f73386e100bb10d
Subproject commit e399b93802610dcf574ff64bcb7677572cd028c1

View File

@ -102,112 +102,158 @@ static onceToken token1([](){
#define REALM "realm_zlmedaikit"
static map<string,FlvRecorder::Ptr> s_mapFlvRecorder;
static mutex s_mtxFlvRecorder;
static onceToken s_token([](){
//监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnGetRtspRealm,[](BroadcastOnGetRtspRealmArgs){
DebugL << "RTSP是否需要鉴权事件" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ;
if(string("1") == args._streamid ){
// live/1需要认证
//该流需要认证并且设置realm
invoker(REALM);
}else{
//有时我们要查询redis或数据库来判断该流是否需要认证通过invoker的方式可以做到完全异步
//该流我们不需要认证
invoker("");
}
});
//监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnRtspAuth,[](BroadcastOnRtspAuthArgs){
DebugL << "RTSP播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ;
DebugL << "RTSP用户" << user_name << (must_no_encrypt ? " Base64" : " MD5" )<< " 方式登录";
string user = user_name;
//假设我们异步读取数据库
if(user == "test0"){
//假设数据库保存的是明文
invoker(false,"pwd0");
return;
}
static void initEvent() {
static onceToken s_token([]() {
//监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm,
[](BroadcastOnGetRtspRealmArgs) {
DebugL << "RTSP是否需要鉴权事件" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " "
<< args._param_strs;
if (string("1") == args._streamid) {
// live/1需要认证
//该流需要认证并且设置realm
invoker(REALM);
} else {
//有时我们要查询redis或数据库来判断该流是否需要认证通过invoker的方式可以做到完全异步
//该流我们不需要认证
invoker("");
}
});
if(user == "test1"){
//假设数据库保存的是密文
auto encrypted_pwd = MD5(user + ":" + REALM + ":" + "pwd1").hexdigest();
invoker(true,encrypted_pwd);
return;
}
if(user == "test2" && must_no_encrypt){
//假设登录的是test2,并且以base64方式登录此时我们提供加密密码那么会导致认证失败
//可以通过这个方式屏蔽base64这种不安全的加密方式
invoker(true,"pwd2");
return;
}
//监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnRtspAuth,
[](BroadcastOnRtspAuthArgs) {
DebugL << "RTSP播放鉴权:" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " "
<< args._param_strs;
DebugL << "RTSP用户" << user_name
<< (must_no_encrypt ? " Base64" : " MD5")
<< " 方式登录";
string user = user_name;
//假设我们异步读取数据库
if (user == "test0") {
//假设数据库保存的是明文
invoker(false, "pwd0");
return;
}
//其他用户密码跟用户名一致
invoker(false,user);
});
if (user == "test1") {
//假设数据库保存的是密文
auto encrypted_pwd = MD5(
user + ":" + REALM + ":" +
"pwd1").hexdigest();
invoker(true, encrypted_pwd);
return;
}
if (user == "test2" && must_no_encrypt) {
//假设登录的是test2,并且以base64方式登录此时我们提供加密密码那么会导致认证失败
//可以通过这个方式屏蔽base64这种不安全的加密方式
invoker(true, "pwd2");
return;
}
//其他用户密码跟用户名一致
invoker(false, user);
});
//监听rtsp/rtmp推流事件返回结果告知是否有推流权限
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//监听rtsp/rtmp推流事件返回结果告知是否有推流权限
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish,
[](BroadcastMediaPublishArgs) {
DebugL << "推流鉴权:" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " "
<< args._param_strs;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//监听rtsp/rtsps/rtmp/http-flv播放事件返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权)
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPlayed,[](BroadcastMediaPlayedArgs){
DebugL << "播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//监听rtsp/rtsps/rtmp/http-flv播放事件返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权)
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPlayed,
[](BroadcastMediaPlayedArgs) {
DebugL << "播放鉴权:" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " "
<< args._param_strs;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//shell登录事件通过shell可以登录进服务器执行一些命令
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){
DebugL << "shell login:" << user_name << " " << passwd;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//shell登录事件通过shell可以登录进服务器执行一些命令
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastShellLogin,
[](BroadcastShellLoginArgs) {
DebugL << "shell login:" << user_name << " "
<< passwd;
invoker("");//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
//监听rtsp、rtmp源注册或注销事件此处用于测试rtmp保存为flv录像保存在http根目录下
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){
if(schema == RTMP_SCHEMA && app == "live"){
lock_guard<mutex> lck(s_mtxFlvRecorder);
if(bRegist){
DebugL << "开始录制RTMP" << schema << " " << vhost << " " << app << " " << stream;
GET_CONFIG(string,http_root,Http::kRootPath);
auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv";
FlvRecorder::Ptr recorder(new FlvRecorder);
try{
recorder->startRecord(EventPollerPool::Instance().getPoller(),dynamic_pointer_cast<RtmpMediaSource>(sender.shared_from_this()),path);
s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder;
}catch(std::exception &ex){
WarnL << ex.what();
}
}else{
s_mapFlvRecorder.erase(vhost + "/" + app + "/" + stream);
}
}
});
//监听rtsp、rtmp源注册或注销事件此处用于测试rtmp保存为flv录像保存在http根目录下
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
[](BroadcastMediaChangedArgs) {
if (schema == RTMP_SCHEMA && app == "live") {
lock_guard<mutex> lck(s_mtxFlvRecorder);
if (bRegist) {
DebugL << "开始录制RTMP" << schema << " "
<< vhost << " " << app << " "
<< stream;
GET_CONFIG(string, http_root,
Http::kRootPath);
auto path = http_root + "/" + vhost + "/" +
app + "/" + stream + "_" +
to_string(time(NULL)) + ".flv";
FlvRecorder::Ptr recorder(new FlvRecorder);
try {
recorder->startRecord(
EventPollerPool::Instance().getPoller(),
dynamic_pointer_cast<RtmpMediaSource>(
sender.shared_from_this()),
path);
s_mapFlvRecorder[vhost + "/" + app +
"/" +
stream] = recorder;
} catch (std::exception &ex) {
WarnL << ex.what();
}
} else {
s_mapFlvRecorder.erase(
vhost + "/" + app + "/" + stream);
}
}
});
//监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){
/**
*
* ZLMediaKit会把其立即转发给播放器(55)
*/
DebugL << "未找到流事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ;
});
//监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream,
[](BroadcastNotFoundStreamArgs) {
/**
*
* ZLMediaKit会把其立即转发给播放器(55)
*/
DebugL << "未找到流事件:" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " "
<< args._param_strs;
});
//监听播放或推流结束时消耗流量事件
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){
DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs
<< "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "" ;
//监听播放或推流结束时消耗流量事件
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastFlowReport,
[](BroadcastFlowReportArgs) {
DebugL << "播放器(推流器)断开连接事件:" << args._schema << " "
<< args._vhost << " " << args._app << " "
<< args._streamid << " " << args._param_strs
<< "\r\n使用流量:" << totalBytes
<< " bytes,连接时长:" << totalDuration << "";
});
});
}, nullptr);
}, nullptr);
}
#if !defined(SIGHUP)
#define SIGHUP 1
@ -296,6 +342,7 @@ static int do_main(string ini_file) {
TcpServer::Ptr rtspSSLSrv(new TcpServer());
rtspSSLSrv->start<RtspSessionWithSSL>(rtspsPort);//默认322
initEvent();
//服务器支持动态切换端口(不影响现有连接)
NoticeCenter::Instance().addListener(ReloadConfigTag,Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){
//重新创建服务器

View File

@ -9,7 +9,7 @@ else()
file(GLOB MediaServer_src_list ./*.cpp ./*.h)
endif()
message(STATUS ${MediaServer_src_list})
#message(STATUS ${MediaServer_src_list})
add_executable(MediaServer ${MediaServer_src_list})

View File

@ -449,6 +449,8 @@ void installWebApi() {
const string &app,
const string &stream,
const string &url,
bool enable_rtsp,
bool enable_rtmp,
bool enable_hls,
bool enable_mp4,
int rtp_type,
@ -461,7 +463,7 @@ void installWebApi() {
return;
}
//添加拉流代理
PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_hls,enable_mp4));
PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_rtsp,enable_rtmp,enable_hls,enable_mp4));
s_proxyMap[key] = player;
//指定RTP over TCP(播放rtsp时有效)
@ -484,16 +486,18 @@ void installWebApi() {
};
//动态添加rtsp/rtmp拉流代理
//测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&stream=0&url=rtmp://127.0.0.1/live/obs
//测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs
API_REGIST_INVOKER(api,addStreamProxy,{
CHECK_SECRET();
CHECK_ARGS("vhost","app","stream","url");
CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp");
addStreamProxy(allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["url"],
allArgs["enable_hls"],
allArgs["enable_mp4"],
allArgs["enable_rtsp"],/* 是否rtsp转发 */
allArgs["enable_rtmp"],/* 是否rtmp转发 */
allArgs["enable_hls"],/* 是否hls转发 */
allArgs["enable_mp4"],/* 是否MP4录制 */
allArgs["rtp_type"],
[invoker,val,headerOut](const SockException &ex,const string &key){
if(ex){
@ -612,7 +616,7 @@ void installWebApi() {
#if !defined(_WIN32)
API_REGIST_INVOKER(hook,on_stream_not_found,{
API_REGIST_INVOKER(hook,on_stream_not_found_ffmpeg,{
//媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流
CHECK_SECRET();
CHECK_ARGS("vhost","app","stream");
@ -640,8 +644,7 @@ void installWebApi() {
invoker("200 OK", headerOut, val.toStyledString());
});
});
#else
#endif//!defined(_WIN32)
API_REGIST_INVOKER(hook,on_stream_not_found,{
//媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流
@ -652,9 +655,11 @@ void installWebApi() {
allArgs["app"],
allArgs["stream"],
/** 支持rtsp和rtmp方式拉流 rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/
"rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov",//rtmp://live.hkstv.hk.lxdns.com/live/hks2
false,
false,
"rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov",
true,/* 开启rtsp转发 */
true,/* 开启rtmp转发 */
true,/* 开启hls转发 */
false,/* 禁用MP4录制 */
0,//rtp over tcp方式拉流
[invoker,val,headerOut](const SockException &ex,const string &key){
if(ex){
@ -666,7 +671,6 @@ void installWebApi() {
invoker("200 OK", headerOut, val.toStyledString());
});
});
#endif // !defined(_WIN32)
API_REGIST(hook,on_record_mp4,{
//录制mp4分片完毕事件

View File

@ -41,9 +41,11 @@ DevChannel::DevChannel(const string &strVhost,
const string &strApp,
const string &strId,
float fDuration,
bool bEanbleRtsp,
bool bEanbleRtmp,
bool bEanbleHls,
bool bEnableMp4) :
MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleHls, bEnableMp4) {}
MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleRtsp, bEanbleRtmp, bEanbleHls, bEnableMp4) {}
DevChannel::~DevChannel() {}
@ -101,7 +103,7 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32
} else {
prefixeSize = 0;
}
inputFrame(std::make_shared<H264FrameNoCopyAble>((char *)pcData,iDataLen,dts,pts,prefixeSize));
inputFrame(std::make_shared<H264FrameNoCacheAble>((char *)pcData,iDataLen,dts,pts,prefixeSize));
}
void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) {
@ -117,12 +119,12 @@ void DevChannel::inputAAC(const char *pcDataWithoutAdts,int iDataLen, uint32_t u
uiStamp = (uint32_t)_aTicker[1].elapsedTime();
}
if(pcAdtsHeader + 7 == pcDataWithoutAdts){
inputFrame(std::make_shared<AACFrameNoCopyAble>((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7));
inputFrame(std::make_shared<AACFrameNoCacheAble>((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7));
} else {
char *dataWithAdts = new char[iDataLen + 7];
memcpy(dataWithAdts,pcAdtsHeader,7);
memcpy(dataWithAdts + 7 , pcDataWithoutAdts , iDataLen);
inputFrame(std::make_shared<AACFrameNoCopyAble>(dataWithAdts,iDataLen + 7,uiStamp,7));
inputFrame(std::make_shared<AACFrameNoCacheAble>(dataWithAdts,iDataLen + 7,uiStamp,7));
delete [] dataWithAdts;
}
}

View File

@ -74,6 +74,8 @@ public:
const string &strApp,
const string &strId,
float fDuration = 0,
bool bEanbleRtsp = true,
bool bEanbleRtmp = true,
bool bEanbleHls = true,
bool bEnableMp4 = false);

View File

@ -241,9 +241,17 @@ void MediaInfo::parse(const string &url){
}
void MediaSourceEvent::onNoneReader(MediaSource &sender){
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId();
//没有任何读取器消费该源,表明该源可以关闭了
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender);
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId();
weak_ptr<MediaSource> weakPtr = sender.shared_from_this();
//异步广播该事件防止同步调用sender.close()导致在接收rtp或rtmp包时清空包缓存等操作
EventPollerPool::Instance().getPoller()->async([weakPtr](){
auto strongPtr = weakPtr.lock();
if(strongPtr){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,*strongPtr);
}
},false);
}

View File

@ -39,12 +39,18 @@ public:
const string &strApp,
const string &strId,
float dur_sec = 0.0,
bool bEanbleRtsp = true,
bool bEanbleRtmp = true,
bool bEanbleHls = true,
bool bEnableMp4 = false){
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost,strApp,strId,std::make_shared<TitleMete>(dur_sec));
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost,strApp,strId,std::make_shared<TitleSdp>(dur_sec));
bool bEnableMp4 = false
){
if (bEanbleRtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, strApp, strId, std::make_shared<TitleMete>(dur_sec));
}
if (bEanbleRtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, strApp, strId, std::make_shared<TitleSdp>(dur_sec));
}
_record = std::make_shared<MediaRecorder>(vhost,strApp,strId,bEanbleHls,bEnableMp4);
}
virtual ~MultiMediaSourceMuxer(){}
@ -54,8 +60,12 @@ public:
* @param track
*/
void addTrack(const Track::Ptr & track) {
_rtmp->addTrack(track);
_rtsp->addTrack(track);
if(_rtmp){
_rtmp->addTrack(track);
}
if(_rtsp){
_rtsp->addTrack(track);
}
_record->addTrack(track);
}
@ -64,8 +74,12 @@ public:
* @param frame
*/
void inputFrame(const Frame::Ptr &frame) override {
_rtmp->inputFrame(frame);
_rtsp->inputFrame(frame);
if(_rtmp) {
_rtmp->inputFrame(frame);
}
if(_rtsp) {
_rtsp->inputFrame(frame);
}
_record->inputFrame(frame);
}
@ -74,8 +88,12 @@ public:
* @param listener
*/
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_rtmp->setListener(listener);
_rtsp->setListener(listener);
if(_rtmp) {
_rtmp->setListener(listener);
}
if(_rtsp) {
_rtsp->setListener(listener);
}
}
/**
@ -83,11 +101,13 @@ public:
* @return
*/
int readerCount() const{
return _rtsp->readerCount() + _rtmp->readerCount();
return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0);
}
void setTimeStamp(uint32_t stamp){
_rtsp->setTimeStamp(stamp);
if(_rtsp){
_rtsp->setTimeStamp(stamp);
}
}
private:
RtmpMediaSourceMuxer::Ptr _rtmp;

View File

@ -16,7 +16,9 @@ namespace mediakit{
string FindField(const char *buf, const char *start, const char *end, int bufSize = 0);
struct StrCaseCompare {
bool operator()(const string &__x, const string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; }
bool operator()(const string &__x, const string &__y) const {
return strcasecmp(__x.data(), __y.data()) < 0;
}
};
@ -25,17 +27,19 @@ class StrCaseMap : public multimap<string, string, StrCaseCompare>{
typedef multimap<string, string, StrCaseCompare> Super ;
StrCaseMap() = default;
~StrCaseMap() = default;
string &operator[](const string &key){
auto it = find(key);
template <class K>
string &operator[](K &&k){
auto it = find(std::forward<K>(k));
if(it == end()){
it = Super::emplace(key,"");
it = Super::emplace(std::forward<K>(k),"");
}
return it->second;
}
template <class K,class V>
void emplace(K &&k , V &&v) {
auto it = find(k);
auto it = find(std::forward<K>(k));
if(it != end()){
return;
}

View File

@ -161,11 +161,13 @@ namespace Rtsp {
const string kAuthBasic = RTSP_FIELD"authBasic";
const string kHandshakeSecond = RTSP_FIELD"handshakeSecond";
const string kKeepAliveSecond = RTSP_FIELD"keepAliveSecond";
const string kDirectProxy = RTSP_FIELD"directProxy";;
onceToken token([](){
//默认Md5方式认证
mINI::Instance()[kAuthBasic] = 0;
mINI::Instance()[kHandshakeSecond] = 15;
mINI::Instance()[kKeepAliveSecond] = 15;
mINI::Instance()[kDirectProxy] = 1;
},nullptr);
} //namespace Rtsp

View File

@ -202,6 +202,13 @@ extern const string kAuthBasic;
extern const string kHandshakeSecond;
//维持链接超时时间默认15秒
extern const string kKeepAliveSecond;
//rtsp拉流代理是否直接代理
//直接代理后支持任意编码格式但是会导致GOP缓存无法定位到I帧可能会导致开播花屏
//并且如果是tcp方式拉流如果rtp大于mtu会导致无法使用udp方式代理
//假定您的拉流源地址不是264或265或AAC那么你可以使用直接代理的方式来支持rtsp代理
//默认开启rtsp直接代理rtmp由于没有这些问题是强制开启直接代理的
extern const string kDirectProxy;
} //namespace Rtsp
////////////RTMP服务器配置///////////

View File

@ -105,11 +105,11 @@ public:
uint32_t iPrefixSize = 7;
} ;
class AACFrameNoCopyAble : public FrameNoCopyAble {
class AACFrameNoCacheAble : public FrameNoCacheAble {
public:
typedef std::shared_ptr<AACFrameNoCopyAble> Ptr;
typedef std::shared_ptr<AACFrameNoCacheAble> Ptr;
AACFrameNoCopyAble(char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){
AACFrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){
_ptr = ptr;
_size = size;
_dts = dts;

View File

@ -118,8 +118,20 @@ Track::Ptr Factory::getTrackByCodecId(CodecId codecId) {
RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
GET_CONFIG(uint32_t,audio_mtu,Rtp::kAudioMtuSize);
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize);
// ssrc不冲突即可
uint32_t ssrc = ((uint64_t) sdp.get()) & 0xFFFFFFFF;
// ssrc不冲突即可,可以为任意的32位整形
static atomic<uint32_t> s_ssrc(0);
uint32_t ssrc = s_ssrc++;
if(!ssrc){
//ssrc不能为0
ssrc = 1;
}
if(sdp->getTrackType() == TrackVideo){
//视频的ssrc是偶数方便调试
ssrc = 2 * ssrc;
}else{
//音频ssrc是奇数
ssrc = 2 * ssrc + 1;
}
auto mtu = (sdp->getTrackType() == TrackVideo ? video_mtu : audio_mtu);
auto sample_rate = sdp->getSampleRate();
auto pt = sdp->getPlayloadType();

42
src/Extension/Frame.cpp Normal file
View File

@ -0,0 +1,42 @@
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "Frame.h"
using namespace std;
using namespace toolkit;
namespace mediakit{
Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){
if(frame->cacheAble()){
return frame;
}
return std::make_shared<FrameCacheAble>(frame);
}
}//namespace mediakit

View File

@ -77,7 +77,7 @@ public:
/**
*
*/
class Frame : public Buffer, public CodecInfo{
class Frame : public Buffer, public CodecInfo {
public:
typedef std::shared_ptr<Frame> Ptr;
virtual ~Frame(){}
@ -116,6 +116,17 @@ public:
* @return
*/
virtual bool keyFrame() const = 0;
/**
*
*/
virtual bool cacheAble() const { return true; }
/**
* frame
* @return
*/
static Ptr getCacheAbleFrame(const Ptr &frame);
};
/**
@ -281,9 +292,12 @@ private:
map<void *,FrameWriterInterface::Ptr> _delegateMap;
};
class FrameNoCopyAble : public Frame{
/**
* Frame接口包装指针便使ZLMediaKit
*/
class FrameFromPtr : public Frame{
public:
typedef std::shared_ptr<FrameNoCopyAble> Ptr;
typedef std::shared_ptr<FrameFromPtr> Ptr;
char *data() const override{
return _ptr;
}
@ -305,7 +319,6 @@ public:
uint32_t prefixSize() const override{
return _prefixSize;
}
protected:
char *_ptr;
uint32_t _size;
@ -314,6 +327,81 @@ protected:
uint32_t _prefixSize;
};
/**
* DevChannel类中有用到
* 使
* ZLMediaKit是同步对帧数据进行使用和处理的
*
* Frame::getCacheAbleFrame方法拷贝一个可缓存的帧
*/
class FrameNoCacheAble : public FrameFromPtr{
public:
typedef std::shared_ptr<FrameNoCacheAble> Ptr;
/**
*
* @return
*/
bool cacheAble() const override {
return false;
}
};
/**
*
* @see FrameNoCacheAble
*/
class FrameCacheAble : public FrameFromPtr {
public:
typedef std::shared_ptr<FrameCacheAble> Ptr;
FrameCacheAble(const Frame::Ptr &frame){
if(frame->cacheAble()){
_frame = frame;
_ptr = frame->data();
}else{
_buffer = std::make_shared<BufferRaw>();
_buffer->assign(frame->data(),frame->size());
_ptr = _buffer->data();
}
_size = frame->size();
_dts = frame->dts();
_pts = frame->pts();
_prefixSize = frame->prefixSize();
_trackType = frame->getTrackType();
_codec = frame->getCodecId();
_key = frame->keyFrame();
}
virtual ~FrameCacheAble() = default;
/**
*
* @return
*/
bool cacheAble() const override {
return true;
}
TrackType getTrackType() const override{
return _trackType;
}
CodecId getCodecId() const override{
return _codec;
}
bool keyFrame() const override{
return _key;
}
private:
Frame::Ptr _frame;
BufferRaw::Ptr _buffer;
TrackType _trackType;
CodecId _codec;
bool _key;
};
}//namespace mediakit

View File

@ -92,11 +92,16 @@ public:
};
class H264FrameNoCopyAble : public FrameNoCopyAble {
/**
* H264类
* Frame类
* DevChannel中有使用
*/
class H264FrameNoCacheAble : public FrameNoCacheAble {
public:
typedef std::shared_ptr<H264FrameNoCopyAble> Ptr;
typedef std::shared_ptr<H264FrameNoCacheAble> Ptr;
H264FrameNoCopyAble(char *ptr,uint32_t size,uint32_t dts , uint32_t pts ,int prefixeSize = 4){
H264FrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts , uint32_t pts ,int prefixeSize = 4){
_ptr = ptr;
_size = size;
_dts = dts;
@ -117,17 +122,26 @@ public:
}
};
class H264FrameSubFrame : public H264FrameNoCopyAble{
/**
* H264Frame类中可以有多个帧 0x 00 00 01
* ZLMediaKit会先把这种复合帧split成单个帧然后再处理
* H264FrameSubFrame
*
*/
class H264FrameSubFrame : public H264FrameNoCacheAble{
public:
typedef std::shared_ptr<H264FrameSubFrame> Ptr;
H264FrameSubFrame(const Frame::Ptr &strongRef,
H264FrameSubFrame(const Frame::Ptr &parent_frame,
char *ptr,
uint32_t size,
int prefixeSize) : H264FrameNoCopyAble(ptr,size,strongRef->dts(),strongRef->pts(),prefixeSize){
_strongRef = strongRef;
int prefixeSize) : H264FrameNoCacheAble(ptr,size,parent_frame->dts(),parent_frame->pts(),prefixeSize){
_parent_frame = parent_frame;
}
bool cacheAble() const override {
return _parent_frame->cacheAble();
}
private:
Frame::Ptr _strongRef;
Frame::Ptr _parent_frame;
};
/**

View File

@ -108,6 +108,7 @@ inline void H264RtmpDecoder::onGetH264_l(const char* pcData, int iLen, uint32_t
}
}
inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dts,uint32_t pts) {
#if 1
_h264frame->type = H264_TYPE(pcData[0]);
_h264frame->timeStamp = dts;
_h264frame->ptsStamp = pts;
@ -117,6 +118,11 @@ inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dt
//写入环形缓存
RtmpCodec::inputFrame(_h264frame);
_h264frame = obtainFrame();
#else
//防止内存拷贝这样产生的264帧不会有0x00 00 01头
auto frame = std::make_shared<H264FrameNoCacheAble>((char *)pcData,iLen,dts,pts,0);
RtmpCodec::inputFrame(frame);
#endif
}

View File

@ -121,11 +121,11 @@ public:
};
class H265FrameNoCopyAble : public FrameNoCopyAble {
class H265FrameNoCacheAble : public FrameNoCacheAble {
public:
typedef std::shared_ptr<H265FrameNoCopyAble> Ptr;
typedef std::shared_ptr<H265FrameNoCacheAble> Ptr;
H265FrameNoCopyAble(char *ptr, uint32_t size, uint32_t dts,uint32_t pts, int prefixeSize = 4) {
H265FrameNoCacheAble(char *ptr, uint32_t size, uint32_t dts,uint32_t pts, int prefixeSize = 4) {
_ptr = ptr;
_size = size;
_dts = dts;

View File

@ -918,7 +918,11 @@ void HttpSession::responseDelay(const string &Origin,bool bClose,
headerOther["Access-Control-Allow-Origin"] = Origin;
headerOther["Access-Control-Allow-Credentials"] = "true";
}
const_cast<KeyValue &>(headerOut).insert(headerOther.begin(), headerOther.end());
for (auto &pr : headerOther){
//添加默认http头默认http头不能覆盖用户自定义的头
const_cast<KeyValue &>(headerOut).emplace(pr.first,pr.second);
}
sendResponse(codeOut.data(), headerOut, contentOut);
}
inline void HttpSession::sendNotFound(bool bClose) {

View File

@ -37,6 +37,7 @@ namespace mediakit {
#ifdef ENABLE_MP4V2
MediaReader::MediaReader(const string &strVhost,const string &strApp, const string &strId,const string &filePath ) {
_poller = EventPollerPool::Instance().getPoller();
auto strFileName = filePath;
if(strFileName.empty()){
GET_CONFIG(string,recordPath,Record::kFilePath);
@ -137,7 +138,7 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri
}
_iDuration = MAX(_video_ms,_audio_ms);
_mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost,strApp,strId,_iDuration/1000.0,false, false));
_mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost, strApp, strId, _iDuration / 1000.0, true, true, false, false));
if (_audio_trId != MP4_INVALID_TRACK_ID) {
AACTrack::Ptr track = std::make_shared<AACTrack>(_strAacCfg);
_mediaMuxer->addTrack(track);
@ -164,7 +165,7 @@ void MediaReader::startReadMP4() {
_timer = std::make_shared<Timer>(sampleMS / 1000.0f,[strongSelf](){
return strongSelf->readSample(0,false);
}, nullptr);
}, _poller);
//先读sampleMS毫秒的数据用于产生MediaSouce
readSample(sampleMS, false);
@ -260,11 +261,11 @@ inline bool MediaReader::readAudioSample(int iTimeInc,bool justSeekSyncFrame) {
}
inline void MediaReader::writeH264(uint8_t *pucData,int iLen,uint32_t dts,uint32_t pts) {
_mediaMuxer->inputFrame(std::make_shared<H264FrameNoCopyAble>((char*)pucData,iLen,dts,pts));
_mediaMuxer->inputFrame(std::make_shared<H264FrameNoCacheAble>((char*)pucData,iLen,dts,pts));
}
inline void MediaReader::writeAAC(uint8_t *pucData,int iLen,uint32_t uiStamp) {
_mediaMuxer->inputFrame(std::make_shared<AACFrameNoCopyAble>((char*)pucData,iLen,uiStamp));
_mediaMuxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char*)pucData,iLen,uiStamp));
}
inline MP4SampleId MediaReader::getVideoSampleId(int iTimeInc ) {

View File

@ -132,6 +132,7 @@ private:
Ticker _alive;
recursive_mutex _mtx;
Timer::Ptr _timer;
EventPoller::Ptr _poller;
#endif //ENABLE_MP4V2
};

View File

@ -25,7 +25,7 @@
*/
#ifdef ENABLE_MP4V2
#include <ctime>
#include <sys/stat.h>
#include "Common/config.h"
#include "Mp4Maker.h"

View File

@ -70,7 +70,12 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
merged.append(frame->data(),frame->size());
if(frame->prefixSize()){
merged.append(frame->data(),frame->size());
} else{
merged.append("\x00\x00\x00\x01",4);
merged.append(frame->data(),frame->size());
}
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
@ -78,7 +83,7 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
mpeg_ts_write(_context, it->second, back->keyFrame() ? 0x0001 : 0, back->pts() * 90LL, back->dts() * 90LL, merged_frame->data(), merged_frame->size());
_frameCached.clear();
}
_frameCached.emplace_back(frame);
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
break;
default: {

View File

@ -40,12 +40,23 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const st
ptr->teardown();
};
string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsps",prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtspPlayerImp>(poller),releasePlayer);
}
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
}
if (strcasecmp("rtmps",prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtmpPlayerImp>(poller),releasePlayer);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(poller),releasePlayer);
}
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
}

View File

@ -216,7 +216,7 @@ public:
void setMediaSouce(const MediaSource::Ptr & src) override {
if (_parser) {
return _parser->setMediaSouce(src);
_parser->setMediaSouce(src);
}
_pMediaSrc = src;
}

View File

@ -65,6 +65,8 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00,
PlayerProxy::PlayerProxy(const string &strVhost,
const string &strApp,
const string &strSrc,
bool bEnableRtsp,
bool bEnableRtmp,
bool bEnableHls,
bool bEnableMp4,
int iRetryCount,
@ -72,6 +74,8 @@ PlayerProxy::PlayerProxy(const string &strVhost,
_strVhost = strVhost;
_strApp = strApp;
_strSrc = strSrc;
_bEnableRtsp = bEnableRtsp;
_bEnableRtmp = bEnableRtmp;
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
_iRetryCount = iRetryCount;
@ -126,13 +130,30 @@ void PlayerProxy::play(const string &strUrlTmp) {
}
});
MediaPlayer::play(strUrlTmp);
MediaSource::Ptr mediaSource;
if(dynamic_pointer_cast<RtspPlayer>(_parser)){
//rtsp拉流
GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy);
if(directProxy && _bEnableRtsp){
mediaSource = std::make_shared<RtspMediaSource>(_strVhost,_strApp,_strSrc);
}
}else if(dynamic_pointer_cast<RtmpPlayer>(_parser)){
//rtmp拉流
if(_bEnableRtmp){
mediaSource = std::make_shared<RtmpMediaSource>(_strVhost,_strApp,_strSrc);
}
}
if(mediaSource){
setMediaSouce(mediaSource);
mediaSource->setListener(shared_from_this());
}
}
PlayerProxy::~PlayerProxy() {
_timer.reset();
}
void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
auto iTaskId = reinterpret_cast<uint64_t>(this);
auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000,60*1000));
weak_ptr<PlayerProxy> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() {
@ -146,8 +167,13 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
return false;
}, getPoller());
}
int PlayerProxy::readerCount(){
return (_mediaMuxer ? _mediaMuxer->readerCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
}
bool PlayerProxy::close(MediaSource &sender,bool force) {
if(!_mediaMuxer || (!force && _mediaMuxer->readerCount() != 0)){
if(!force && readerCount() != 0){
return false;
}
@ -157,6 +183,7 @@ bool PlayerProxy::close(MediaSource &sender,bool force) {
auto stronSelf = weakSlef.lock();
if (stronSelf) {
stronSelf->_mediaMuxer.reset();
stronSelf->setMediaSouce(nullptr);
stronSelf->teardown();
if(stronSelf->_onClose){
stronSelf->_onClose();
@ -185,7 +212,7 @@ public:
auto iAudioIndex = frame->stamp() / MUTE_ADTS_DATA_MS;
if(_iAudioIndex != iAudioIndex){
_iAudioIndex = iAudioIndex;
auto aacFrame = std::make_shared<AACFrameNoCopyAble>((char *)MUTE_ADTS_DATA,
auto aacFrame = std::make_shared<AACFrameNoCacheAble>((char *)MUTE_ADTS_DATA,
MUTE_ADTS_DATA_LEN,
_iAudioIndex * MUTE_ADTS_DATA_MS);
FrameRingInterfaceDelegate::inputFrame(aacFrame);
@ -197,7 +224,16 @@ private:
};
void PlayerProxy::onPlaySuccess() {
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost,_strApp,_strSrc,getDuration(),_bEnableHls,_bEnableMp4));
if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) {
//rtsp拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4));
} else if (dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc)) {
//rtmp拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4));
} else {
//其他拉流代理
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4));
}
_mediaMuxer->setListener(shared_from_this());
auto videoTrack = getTrack(TrackVideo,false);

View File

@ -49,6 +49,8 @@ public:
PlayerProxy(const string &strVhost,
const string &strApp,
const string &strSrc,
bool bEnableRtsp = true,
bool bEnableRtmp = true,
bool bEnableHls = true,
bool bEnableMp4 = false,
int iRetryCount = -1,
@ -84,9 +86,12 @@ private:
void onNoneReader(MediaSource &sender) override;
void rePlay(const string &strUrl,int iFailedCnt);
void onPlaySuccess();
int readerCount() ;
private:
bool _bEnableHls;
bool _bEnableMp4;
bool _bEnableRtsp;
bool _bEnableRtmp;
bool _bEnableHls;
bool _bEnableMp4;
int _iRetryCount;
MultiMediaSourceMuxer::Ptr _mediaMuxer;
string _strVhost;

View File

@ -44,12 +44,23 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller,
ptr->teardown();
};
string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsps",prefix.data()) == 0) {
return PusherBase::Ptr(new TcpClientWithSSL<RtspPusher>(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
if (strcasecmp("rtmps",prefix.data()) == 0) {
return PusherBase::Ptr(new TcpClientWithSSL<RtmpPusher>(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
}
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}

View File

@ -38,8 +38,8 @@
#include "Rtmp.h"
#include "RtmpMediaSource.h"
#include "RtmpDemuxer.h"
#include "MediaFile/MediaRecorder.h"
#include "Rtsp/RtspMediaSourceMuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
using namespace std;
using namespace toolkit;
@ -54,49 +54,53 @@ public:
const string &id,
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0):RtmpMediaSource(vhost, app, id,ringSize){
_recorder = std::make_shared<MediaRecorder>(vhost, app, id, bEnableHls, bEnableMp4);
_rtmpDemuxer = std::make_shared<RtmpDemuxer>();
int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
_demuxer = std::make_shared<RtmpDemuxer>();
}
virtual ~RtmpToRtspMediaSource(){}
void onGetMetaData(const AMFValue &metadata) override {
_rtmpDemuxer = std::make_shared<RtmpDemuxer>(metadata);
_demuxer = std::make_shared<RtmpDemuxer>(metadata);
RtmpMediaSource::onGetMetaData(metadata);
}
void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos) override {
_rtmpDemuxer->inputRtmp(pkt);
if(!_rtspMuxer && _rtmpDemuxer->isInited(2000)){
_rtspMuxer = std::make_shared<RtspMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
std::make_shared<TitleSdp>(_rtmpDemuxer->getDuration()));
for (auto &track : _rtmpDemuxer->getTracks(false)){
_rtspMuxer->addTrack(track);
_recorder->addTrack(track);
track->addDelegate(_rtspMuxer);
track->addDelegate(_recorder);
_demuxer->inputRtmp(pkt);
if(!_muxer && _demuxer->isInited(2000)){
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
_demuxer->getDuration(),
true,//转rtsp
false,//不重复生成rtmp
_bEnableHls,
_bEnableMp4);
for (auto &track : _demuxer->getTracks(false)){
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_rtspMuxer->setListener(_listener);
_muxer->setListener(_listener);
}
RtmpMediaSource::onWrite(pkt,key_pos);
}
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override {
RtmpMediaSource::setListener(listener);
if(_rtspMuxer){
_rtspMuxer->setListener(listener);
if(_muxer){
_muxer->setListener(listener);
}
}
int readerCount() override {
return RtmpMediaSource::readerCount() + (_rtspMuxer ? _rtspMuxer->readerCount() : 0);
return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
}
private:
RtmpDemuxer::Ptr _rtmpDemuxer;
RtspMediaSourceMuxer::Ptr _rtspMuxer;
MediaRecorder::Ptr _recorder;
RtmpDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls;
bool _bEnableMp4;
};
} /* namespace mediakit */

View File

@ -232,6 +232,31 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
sendSetup(0);
}
//有必要的情况下创建udp端口
void RtspPlayer::createUdpSockIfNecessary(int track_idx){
auto &rtpSockRef = _apRtpSock[track_idx];
auto &rtcpSockRef = _apRtcpSock[track_idx];
if(!rtpSockRef){
rtpSockRef.reset(new Socket(getPoller()));
//rtp随机端口
if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) {
rtpSockRef.reset();
throw std::runtime_error("open rtp sock failed");
}
}
if(!rtcpSockRef){
rtcpSockRef.reset(new Socket(getPoller()));
//rtcp端口为rtp端口+1目的是为了兼容某些服务器其实更推荐随机端口
if (!rtcpSockRef->bindUdpSock(rtpSockRef->get_local_port() + 1, get_local_ip().data())) {
rtcpSockRef.reset();
throw std::runtime_error("open rtcp sock failed");
}
}
}
//发送SETUP命令
void RtspPlayer::sendSetup(unsigned int trackIndex) {
_onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex);
@ -247,16 +272,7 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) {
}
break;
case Rtsp::RTP_UDP: {
_apRtpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apRtpSock[trackIndex].reset();
throw std::runtime_error("open rtp sock err");
}
_apRtcpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apRtcpSock[trackIndex]->bindUdpSock(_apRtpSock[trackIndex]->get_local_port() + 1, get_local_ip().data())) {
_apRtcpSock[trackIndex].reset();
throw std::runtime_error("open rtcp sock err");
}
createUdpSockIfNecessary(trackIndex);
sendRtspRequest("SETUP",baseUrl,{"Transport",
StrPrinter << "RTP/AVP;unicast;client_port="
<< _apRtpSock[trackIndex]->get_local_port() << "-"
@ -280,7 +296,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}
auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){
_eType = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
_eType = Rtsp::RTP_MULTICAST;
@ -314,7 +330,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data());
}
} else {
//udp单播
createUdpSockIfNecessary(uiTrackIndex);
//udp单播
struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(rtp_port);
rtpto.sin_family = AF_INET;

View File

@ -93,6 +93,11 @@ protected:
* @param uiLen
*/
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
/////////////TcpClient override/////////////
void onConnect(const SockException &err) override;
void onRecv(const Buffer::Ptr &pBuf) override;
void onErr(const SockException &ex) override;
private:
void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track);
void onPlayResult_l(const SockException &ex);
@ -102,10 +107,6 @@ private:
int getTrackIndexByTrackType(TrackType trackType) const;
void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType);
void onConnect(const SockException &err) override;
void onRecv(const Buffer::Ptr &pBuf) override;
void onErr(const SockException &ex) override;
void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex);
void handleResDESCRIBE(const Parser &parser);
bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr);
@ -120,6 +121,7 @@ private:
void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap());
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header);
void sendReceiverReport(bool overTcp,int iTrackIndex);
void createUdpSockIfNecessary(int track_idx);
private:
string _strUrl;
SdpParser _sdpParser;

View File

@ -242,6 +242,19 @@ bool RtspPusher::handleAuthenticationFailure(const string &paramsStr) {
return false;
}
//有必要的情况下创建udp端口
void RtspPusher::createUdpSockIfNecessary(int track_idx){
auto &rtpSockRef = _apUdpSock[track_idx];
if(!rtpSockRef){
rtpSockRef.reset(new Socket(getPoller()));
//rtp随机端口
if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) {
rtpSockRef.reset();
throw std::runtime_error("open rtp sock failed");
}
}
}
void RtspPusher::sendSetup(unsigned int trackIndex) {
_onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex);
auto &track = _aTrackInfo[trackIndex];
@ -252,11 +265,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) {
}
break;
case Rtsp::RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset();
throw std::runtime_error("open udp sock err");
}
createUdpSockIfNecessary(trackIndex);
int port = _apUdpSock[trackIndex]->get_local_port();
sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1});
}
@ -266,6 +275,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) {
}
}
void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) {
if (parser.Url() != "200") {
throw std::runtime_error(
@ -278,7 +288,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex)
}
auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){
_eType = Rtsp::RTP_TCP;
string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
_aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data());
@ -286,19 +296,15 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex)
throw std::runtime_error("SETUP rtsp pusher can not support multicast!");
}else{
_eType = Rtsp::RTP_UDP;
createUdpSockIfNecessary(uiTrackIndex);
const char *strPos = "server_port=" ;
auto port_str = FindField((strTransport + ";").data(), strPos, ";");
uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data());
auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){
pUdpSockRef.reset(new Socket(getPoller()));
}
struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(port);
rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
_apUdpSock[uiTrackIndex]->setSendPeerAddr((struct sockaddr *)&(rtpto));
}
RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP);

View File

@ -65,6 +65,8 @@ private:
void sendRtpPacket(const RtpPacket::Ptr & pkt) ;
void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" );
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = "");
void createUdpSockIfNecessary(int track_idx);
private:
//rtsp鉴权相关
string _rtspMd5Nonce;

View File

@ -29,10 +29,8 @@
#include "Rtmp/amf.h"
#include "RtspMediaSource.h"
#include "MediaFile/MediaRecorder.h"
#include "Rtmp/RtmpMediaSource.h"
#include "RtspDemuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
using namespace toolkit;
@ -48,31 +46,34 @@ public:
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) {
_recorder = std::make_shared<MediaRecorder>(vhost, app, id, bEnableHls, bEnableMp4);
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
}
virtual ~RtspToRtmpMediaSource() {}
virtual void onGetSDP(const string &strSdp) override {
_rtspDemuxer = std::make_shared<RtspDemuxer>(strSdp);
_demuxer = std::make_shared<RtspDemuxer>(strSdp);
RtspMediaSource::onGetSDP(strSdp);
}
virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override {
if (_rtspDemuxer) {
bKeyPos = _rtspDemuxer->inputRtp(rtp);
if (!_rtmpMuxer && _rtspDemuxer->isInited(2000)) {
_rtmpMuxer = std::make_shared<RtmpMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
std::make_shared<TitleMete>(_rtspDemuxer->getDuration()));
for (auto &track : _rtspDemuxer->getTracks(false)) {
_rtmpMuxer->addTrack(track);
_recorder->addTrack(track);
track->addDelegate(_rtmpMuxer);
track->addDelegate(_recorder);
if (_demuxer) {
bKeyPos = _demuxer->inputRtp(rtp);
if (!_muxer && _demuxer->isInited(2000)) {
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
_demuxer->getDuration(),
false,//不重复生成rtsp
true,//转rtmp
_bEnableHls,
_bEnableMp4);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_rtmpMuxer->setListener(_listener);
_muxer->setListener(_listener);
}
}
RtspMediaSource::onWrite(rtp, bKeyPos);
@ -80,17 +81,18 @@ public:
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override {
RtspMediaSource::setListener(listener);
if(_rtmpMuxer){
_rtmpMuxer->setListener(listener);
if(_muxer){
_muxer->setListener(listener);
}
}
int readerCount() override {
return RtspMediaSource::readerCount() + (_rtmpMuxer ? _rtmpMuxer->readerCount() : 0);
return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
}
private:
RtspDemuxer::Ptr _rtspDemuxer;
RtmpMediaSourceMuxer::Ptr _rtmpMuxer;
MediaRecorder::Ptr _recorder;
RtspDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls;
bool _bEnableMp4;
};
} /* namespace mediakit */

View File

@ -56,49 +56,50 @@ onceToken token1([](){
}//namespace Http
} // namespace mediakit
void initEventListener(){
static onceToken s_token([](){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){
//const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){
return;
}
//url以"/api/起始说明是http api"
consumed = true;//该http请求已被消费
static onceToken s_token([](){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){
//const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){
return;
}
//url以"/api/起始说明是http api"
consumed = true;//该http请求已被消费
_StrPrinter printer;
////////////////method////////////////////
printer << "\r\nmethod:\r\n\t" << parser.Method();
////////////////url/////////////////
printer << "\r\nurl:\r\n\t" << parser.Url();
////////////////protocol/////////////////
printer << "\r\nprotocol:\r\n\t" << parser.Tail();
///////////////args//////////////////
printer << "\r\nargs:\r\n";
for(auto &pr : parser.getUrlArgs()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
///////////////header//////////////////
printer << "\r\nheader:\r\n";
for(auto &pr : parser.getValues()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
////////////////content/////////////////
printer << "\r\ncontent:\r\n" << parser.Content();
auto contentOut = printer << endl;
_StrPrinter printer;
////////////////method////////////////////
printer << "\r\nmethod:\r\n\t" << parser.Method();
////////////////url/////////////////
printer << "\r\nurl:\r\n\t" << parser.Url();
////////////////protocol/////////////////
printer << "\r\nprotocol:\r\n\t" << parser.Tail();
///////////////args//////////////////
printer << "\r\nargs:\r\n";
for(auto &pr : parser.getUrlArgs()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
///////////////header//////////////////
printer << "\r\nheader:\r\n";
for(auto &pr : parser.getValues()){
printer << "\t" << pr.first << " : " << pr.second << "\r\n";
}
////////////////content/////////////////
printer << "\r\ncontent:\r\n" << parser.Content();
auto contentOut = printer << endl;
////////////////我们测算异步回复,当然你也可以同步回复/////////////////
EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){
HttpSession::KeyValue headerOut;
//你可以自定义header,如果跟默认header重名则会覆盖之
//默认header有:Server,Connection,Date,Content-Type,Content-Length
//请勿覆盖Connection、Content-Length键
//键名覆盖时不区分大小写
headerOut["TestHeader"] = "HeaderValue";
invoker("200 OK",headerOut,contentOut);
});
});
}, nullptr);
////////////////我们测算异步回复,当然你也可以同步回复/////////////////
EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){
HttpSession::KeyValue headerOut;
//你可以自定义header,如果跟默认header重名则会覆盖之
//默认header有:Server,Connection,Date,Content-Type,Content-Length
//请勿覆盖Connection、Content-Length键
//键名覆盖时不区分大小写
headerOut["TestHeader"] = "HeaderValue";
invoker("200 OK",headerOut,contentOut);
});
});
}, nullptr);
}
int main(int argc,char *argv[]){
//设置退出信号处理函数
@ -111,6 +112,7 @@ int main(int argc,char *argv[]){
//加载配置文件,如果配置文件不存在就创建一个
loadIniConfig();
initEventListener();
//加载证书,证书包含公钥和私钥
SSL_Initor::Instance().loadCertificate((exeDir() + "ssl.p12").data());

View File

@ -25,6 +25,7 @@
*/
#include <signal.h>
#include <unistd.h>
#include "Util/util.h"
#include "Util/logger.h"
#include <iostream>
#include "Poller/EventPoller.h"
@ -68,22 +69,27 @@ int main(int argc, char *argv[]) {
WarnL << "没有视频或者视频不是264编码!";
return;
}
SDLDisplayerHelper::Instance().doTask([viedoTrack]() {
std::shared_ptr<H264Decoder> decoder(new H264Decoder);
std::shared_ptr<YuvDisplayer> displayer(new YuvDisplayer);
viedoTrack->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([decoder, displayer](const Frame::Ptr &frame) {
SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() {
AVFrame *pFrame = nullptr;
bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(),
frame->stamp(), &pFrame);
if (flag) {
displayer->displayYUV(pFrame);
}
return true;
});
}));
return true;
});
AnyStorage::Ptr storage(new AnyStorage);
viedoTrack->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([storage](const Frame::Ptr &frame) {
SDLDisplayerHelper::Instance().doTask([frame,storage]() {
auto &decoder = (*storage)["decoder"];
auto &displayer = (*storage)["displayer"];
if(!decoder){
decoder.set<H264Decoder>();
}
if(!displayer){
displayer.set<YuvDisplayer>();
}
AVFrame *pFrame = nullptr;
bool flag = decoder.get<H264Decoder>().inputVideo((unsigned char *) frame->data(), frame->size(), frame->stamp(), &pFrame);
if (flag) {
displayer.get<YuvDisplayer>().displayYUV(pFrame);
}
return true;
});
}));
});

View File

@ -90,7 +90,7 @@ int domain(const string &playUrl, const string &pushUrl) {
//拉一个流生成一个RtmpMediaSource源的名称是"app/stream"
//你也可以以其他方式生成RtmpMediaSource比如说MP4文件请查看test_rtmpPusherMp4.cpp代码
PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller));
PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",true,true,false,false,-1 , poller));
//可以指定rtsp拉流方式支持tcp和udp方式默认tcp
// (*player)[Client::kRtpType] = Rtsp::RTP_UDP;
player->play(playUrl.data());