Merge branch 'master' into research

This commit is contained in:
liuziloong 2020-01-19 15:09:34 +08:00
commit ee2ca4bda5
24 changed files with 577 additions and 243 deletions

@ -1 +1 @@
Subproject commit 72013f5128d8c04bad8b973370da463c311081c0 Subproject commit be08f01869ebf1391245c24c1e328422eb11ab77

View File

@ -70,8 +70,10 @@ API_EXPORT void API_CALL mk_stop_all_server(){
CLEAR_ARR(rtsp_server); CLEAR_ARR(rtsp_server);
CLEAR_ARR(rtmp_server); CLEAR_ARR(rtmp_server);
CLEAR_ARR(http_server); CLEAR_ARR(http_server);
#ifdef ENABLE_RTPPROXY
udpRtpServer = nullptr; udpRtpServer = nullptr;
tcpRtpServer = nullptr; tcpRtpServer = nullptr;
#endif
stopAllTcpServer(); stopAllTcpServer();
} }

3
build_docker_images.sh Normal file
View File

@ -0,0 +1,3 @@
#!/bin/bash
docker build -t gemfield/zlmediakit:20.01-runtime-ubuntu18.04 -f docker/ubuntu18.04/Dockerfile.runtime .
#docker build -t gemfield/zlmediakit:20.01-devel-ubuntu18.04 -f docker/ubuntu18.04/Dockerfile.devel .

View File

@ -9,7 +9,9 @@ EXPOSE 443/tcp
EXPOSE 10000/udp EXPOSE 10000/udp
EXPOSE 10000/tcp EXPOSE 10000/tcp
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
build-essential \ build-essential \
cmake \ cmake \
git \ git \
@ -22,6 +24,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libx264-dev \ libx264-dev \
libfaac-dev \ libfaac-dev \
libmp4v2-dev && \ libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/media RUN mkdir -p /opt/media
@ -36,5 +40,4 @@ RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \
make -j4 make -j4
ENV PATH /opt/media/ZLMediaKit/release/linux/Release/:$PATH ENV PATH /opt/media/ZLMediaKit/release/linux/Release/:$PATH
CMD MediaServer CMD MediaServer

View File

@ -0,0 +1,62 @@
FROM ubuntu:16.04 AS build
#shell,rtmp,rtsp,rtsps,http,https,rtp
EXPOSE 9000/tcp
EXPOSE 1935/tcp
EXPOSE 554/tcp
EXPOSE 322/tcp
EXPOSE 80/tcp
EXPOSE 443/tcp
EXPOSE 10000/udp
EXPOSE 10000/tcp
RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
build-essential \
cmake \
git \
curl \
vim \
ca-certificates \
tzdata \
libssl-dev \
libmysqlclient-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/media
WORKDIR /opt/media
RUN git clone --depth=1 https://github.com/xiongziliang/ZLMediaKit && \
cd ZLMediaKit && git submodule update --init --recursive && \
mkdir -p build release/linux/Release/
WORKDIR /opt/media/ZLMediaKit/build
RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \
make -j4
FROM ubuntu:16.04
LABEL maintainer "Gemfield <gemfield@civilnet.cn>"
RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
vim \
ca-certificates \
tzdata \
libssl-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/*
WORKDIR /opt/media/bin/
COPY --from=build /opt/media/ZLMediaKit/release/linux/Release/MediaServer /opt/media/bin/MediaServer
ENV PATH /opt/media/bin:$PATH
CMD MediaServer

View File

@ -0,0 +1,44 @@
FROM ubuntu:18.04
LABEL maintainer "Gemfield <gemfield@civilnet.cn>"
#shell,rtmp,rtsp,rtsps,http,https,rtp
EXPOSE 9000/tcp
EXPOSE 1935/tcp
EXPOSE 554/tcp
EXPOSE 322/tcp
EXPOSE 80/tcp
EXPOSE 443/tcp
EXPOSE 10000/udp
EXPOSE 10000/tcp
RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
build-essential \
cmake \
git \
curl \
vim \
ca-certificates \
tzdata \
libssl-dev \
libmysqlclient-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/media
WORKDIR /opt/media
RUN git clone --depth=1 https://github.com/xiongziliang/ZLMediaKit && \
cd ZLMediaKit && git submodule update --init --recursive && \
mkdir -p build release/linux/Release/
WORKDIR /opt/media/ZLMediaKit/build
RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \
make -j4
ENV PATH /opt/media/ZLMediaKit/release/linux/Release:$PATH
CMD MediaServer

View File

@ -0,0 +1,62 @@
FROM ubuntu:18.04 AS build
#shell,rtmp,rtsp,rtsps,http,https,rtp
EXPOSE 9000/tcp
EXPOSE 1935/tcp
EXPOSE 554/tcp
EXPOSE 322/tcp
EXPOSE 80/tcp
EXPOSE 443/tcp
EXPOSE 10000/udp
EXPOSE 10000/tcp
RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
build-essential \
cmake \
git \
curl \
vim \
ca-certificates \
tzdata \
libssl-dev \
libmysqlclient-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/media
WORKDIR /opt/media
RUN git clone --depth=1 https://github.com/xiongziliang/ZLMediaKit && \
cd ZLMediaKit && git submodule update --init --recursive && \
mkdir -p build release/linux/Release/
WORKDIR /opt/media/ZLMediaKit/build
RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \
make -j4
FROM ubuntu:18.04
LABEL maintainer "Gemfield <gemfield@civilnet.cn>"
RUN apt-get update && \
DEBIAN_FRONTEND="noninteractive" \
apt-get install -y --no-install-recommends \
vim \
ca-certificates \
tzdata \
libssl-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
apt autoremove -y && \
apt clean -y && \
rm -rf /var/lib/apt/lists/*
WORKDIR /opt/media/bin/
COPY --from=build /opt/media/ZLMediaKit/release/linux/Release/MediaServer /opt/media/bin/MediaServer
ENV PATH /opt/media/bin:$PATH
CMD MediaServer

View File

@ -48,38 +48,10 @@
#include "Thread/WorkThreadPool.h" #include "Thread/WorkThreadPool.h"
#include "Rtp/RtpSelector.h" #include "Rtp/RtpSelector.h"
#include "FFmpegSource.h" #include "FFmpegSource.h"
using namespace Json; using namespace Json;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
typedef map<string,variant,StrCaseCompare> ApiArgsType;
#define API_ARGS TcpSession &sender, \
HttpSession::KeyValue &headerIn, \
HttpSession::KeyValue &headerOut, \
ApiArgsType &allArgs, \
Json::Value &val
#define API_REGIST(field, name, ...) \
s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker){ \
static auto lam = [&](API_ARGS) __VA_ARGS__ ; \
lam(sender,headerIn, headerOut, allArgs, val); \
invoker("200 OK", headerOut, val.toStyledString()); \
});
#define API_ARGS_VALUE sender,headerIn,headerOut,allArgs,val,invoker
#define API_REGIST_INVOKER(field, name, ...) \
s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker) __VA_ARGS__);
//异步http api lambad定义
typedef std::function<void(API_ARGS,const HttpSession::HttpResponseInvoker &invoker)> AsyncHttpApi;
//api列表
static map<string, AsyncHttpApi> s_map_api;
namespace API { namespace API {
typedef enum { typedef enum {
InvalidArgs = -300, InvalidArgs = -300,
@ -128,6 +100,27 @@ public:
~SuccessException() = default; ~SuccessException() = default;
}; };
#define API_ARGS1 TcpSession &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val
#define API_ARGS2 API_ARGS1, const HttpSession::HttpResponseInvoker &invoker
#define API_ARGS_VALUE1 sender,headerIn,headerOut,allArgs,val
#define API_ARGS_VALUE2 API_ARGS_VALUE1, invoker
typedef map<string, variant, StrCaseCompare> ApiArgsType;
//http api列表
static map<string, std::function<void(API_ARGS2)> > s_map_api;
template<typename FUNC>
static void api_regist1(const string &api_path, FUNC &&func) {
s_map_api.emplace(api_path, [func](API_ARGS2) {
func(API_ARGS_VALUE1);
invoker("200 OK", headerOut, val.toStyledString());
});
}
template<typename FUNC>
static void api_regist2(const string &api_path, FUNC &&func) {
s_map_api.emplace(api_path, std::forward<FUNC>(func));
}
//获取HTTP请求中url参数、content参数 //获取HTTP请求中url参数、content参数
static ApiArgsType getAllArgs(const Parser &parser) { static ApiArgsType getAllArgs(const Parser &parser) {
@ -275,12 +268,11 @@ static recursive_mutex s_ffmpegMapMtx;
*/ */
void installWebApi() { void installWebApi() {
addHttpListener(); addHttpListener();
GET_CONFIG(string,api_secret,API::kSecret); GET_CONFIG(string,api_secret,API::kSecret);
//获取线程负载 //获取线程负载
//测试url http://127.0.0.1/index/api/getThreadsLoad //测试url http://127.0.0.1/index/api/getThreadsLoad
API_REGIST_INVOKER(api, getThreadsLoad, { api_regist2("/index/api/getThreadsLoad",[](API_ARGS2){
EventPollerPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) { EventPollerPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) {
Value val; Value val;
auto vec = EventPollerPool::Instance().getExecutorLoad(); auto vec = EventPollerPool::Instance().getExecutorLoad();
@ -298,7 +290,7 @@ void installWebApi() {
//获取后台工作线程负载 //获取后台工作线程负载
//测试url http://127.0.0.1/index/api/getWorkThreadsLoad //测试url http://127.0.0.1/index/api/getWorkThreadsLoad
API_REGIST_INVOKER(api, getWorkThreadsLoad, { api_regist2("/index/api/getWorkThreadsLoad", [](API_ARGS2){
WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) { WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) {
Value val; Value val;
auto vec = WorkThreadPool::Instance().getExecutorLoad(); auto vec = WorkThreadPool::Instance().getExecutorLoad();
@ -316,7 +308,7 @@ void installWebApi() {
//获取服务器配置 //获取服务器配置
//测试url http://127.0.0.1/index/api/getServerConfig //测试url http://127.0.0.1/index/api/getServerConfig
API_REGIST(api, getServerConfig, { api_regist1("/index/api/getServerConfig",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
Value obj; Value obj;
for (auto &pr : mINI::Instance()) { for (auto &pr : mINI::Instance()) {
@ -328,7 +320,7 @@ void installWebApi() {
//设置服务器配置 //设置服务器配置
//测试url(比如关闭http api调试) http://127.0.0.1/index/api/setServerConfig?api.apiDebug=0 //测试url(比如关闭http api调试) http://127.0.0.1/index/api/setServerConfig?api.apiDebug=0
//你也可以通过http post方式传参可以通过application/x-www-form-urlencoded或application/json方式传参 //你也可以通过http post方式传参可以通过application/x-www-form-urlencoded或application/json方式传参
API_REGIST(api, setServerConfig, { api_regist1("/index/api/setServerConfig",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
auto &ini = mINI::Instance(); auto &ini = mINI::Instance();
int changed = API::Success; int changed = API::Success;
@ -352,19 +344,29 @@ void installWebApi() {
}); });
//获取服务器api列表 static auto s_get_api_list = [](API_ARGS1){
//测试url http://127.0.0.1/index/api/getApiList
API_REGIST(api,getApiList,{
CHECK_SECRET(); CHECK_SECRET();
for(auto &pr : s_map_api){ for(auto &pr : s_map_api){
val["data"].append(pr.first); val["data"].append(pr.first);
} }
};
//获取服务器api列表
//测试url http://127.0.0.1/index/api/getApiList
api_regist1("/index/api/getApiList",[](API_ARGS1){
s_get_api_list(API_ARGS_VALUE1);
});
//获取服务器api列表
//测试url http://127.0.0.1/index/
api_regist1("/index/",[](API_ARGS1){
s_get_api_list(API_ARGS_VALUE1);
}); });
#if !defined(_WIN32) #if !defined(_WIN32)
//重启服务器,只有Daemon方式才能重启否则是直接关闭 //重启服务器,只有Daemon方式才能重启否则是直接关闭
//测试url http://127.0.0.1/index/api/restartServer //测试url http://127.0.0.1/index/api/restartServer
API_REGIST(api,restartServer,{ api_regist1("/index/api/restartServer",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
EventPollerPool::Instance().getPoller()->doDelayTask(1000,[](){ EventPollerPool::Instance().getPoller()->doDelayTask(1000,[](){
//尝试正常退出 //尝试正常退出
@ -387,7 +389,7 @@ void installWebApi() {
//测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList //测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList
//测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__ //测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__
//测试url2(获取rtsp类型的流) http://127.0.0.1/index/api/getMediaList?schema=rtsp //测试url2(获取rtsp类型的流) http://127.0.0.1/index/api/getMediaList?schema=rtsp
API_REGIST(api,getMediaList,{ api_regist1("/index/api/getMediaList",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
//获取所有MediaSource列表 //获取所有MediaSource列表
MediaSource::for_each_media([&](const MediaSource::Ptr &media){ MediaSource::for_each_media([&](const MediaSource::Ptr &media){
@ -419,14 +421,14 @@ void installWebApi() {
}); });
//测试url http://127.0.0.1/index/api/isMediaOnline?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs //测试url http://127.0.0.1/index/api/isMediaOnline?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
API_REGIST(api,isMediaOnline,{ api_regist1("/index/api/isMediaOnline",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream"); CHECK_ARGS("schema","vhost","app","stream");
val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false)); val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false));
}); });
//测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
API_REGIST(api,getMediaInfo,{ api_regist1("/index/api/getMediaInfo",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream"); CHECK_ARGS("schema","vhost","app","stream");
auto src = MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false); auto src = MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false);
@ -448,7 +450,7 @@ void installWebApi() {
//主动关断流,包括关断拉流、推流 //主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
API_REGIST(api,close_stream,{ api_regist1("/index/api/close_stream",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream"); CHECK_ARGS("schema","vhost","app","stream");
//踢掉推流器 //踢掉推流器
@ -468,7 +470,7 @@ void installWebApi() {
//批量主动关断流,包括关断拉流、推流 //批量主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_streams?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 //测试url http://127.0.0.1/index/api/close_streams?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
API_REGIST(api,close_streams,{ api_regist1("/index/api/close_streams",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
//筛选命中个数 //筛选命中个数
int count_hit = 0; int count_hit = 0;
@ -504,7 +506,7 @@ void installWebApi() {
//获取所有TcpSession列表信息 //获取所有TcpSession列表信息
//可以根据本地端口和远端ip来筛选 //可以根据本地端口和远端ip来筛选
//测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935 //测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935
API_REGIST(api,getAllSession,{ api_regist1("/index/api/getAllSession",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
Value jsession; Value jsession;
uint16_t local_port = allArgs["local_port"].as<uint16_t>(); uint16_t local_port = allArgs["local_port"].as<uint16_t>();
@ -529,7 +531,7 @@ void installWebApi() {
//断开tcp连接比如说可以断开rtsp、rtmp播放器等 //断开tcp连接比如说可以断开rtsp、rtmp播放器等
//测试url http://127.0.0.1/index/api/kick_session?id=123456 //测试url http://127.0.0.1/index/api/kick_session?id=123456
API_REGIST(api,kick_session,{ api_regist1("/index/api/kick_session",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("id"); CHECK_ARGS("id");
//踢掉tcp会话 //踢掉tcp会话
@ -543,7 +545,7 @@ void installWebApi() {
//批量断开tcp连接比如说可以断开rtsp、rtmp播放器等 //批量断开tcp连接比如说可以断开rtsp、rtmp播放器等
//测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935 //测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935
API_REGIST(api,kick_sessions,{ api_regist1("/index/api/kick_sessions",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
uint16_t local_port = allArgs["local_port"].as<uint16_t>(); uint16_t local_port = allArgs["local_port"].as<uint16_t>();
string &peer_ip = allArgs["peer_ip"]; string &peer_ip = allArgs["peer_ip"];
@ -609,7 +611,7 @@ void installWebApi() {
//动态添加rtsp/rtmp拉流代理 //动态添加rtsp/rtmp拉流代理
//测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs
API_REGIST_INVOKER(api,addStreamProxy,{ api_regist2("/index/api/addStreamProxy",[](API_ARGS2){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp"); CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp");
addStreamProxy(allArgs["vhost"], addStreamProxy(allArgs["vhost"],
@ -634,7 +636,7 @@ void installWebApi() {
//关闭拉流代理 //关闭拉流代理
//测试url http://127.0.0.1/index/api/delStreamProxy?key=__defaultVhost__/proxy/0 //测试url http://127.0.0.1/index/api/delStreamProxy?key=__defaultVhost__/proxy/0
API_REGIST(api,delStreamProxy,{ api_regist1("/index/api/delStreamProxy",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<recursive_mutex> lck(s_proxyMapMtx); lock_guard<recursive_mutex> lck(s_proxyMapMtx);
@ -671,7 +673,7 @@ void installWebApi() {
//动态添加rtsp/rtmp拉流代理 //动态添加rtsp/rtmp拉流代理
//测试url http://127.0.0.1/index/api/addFFmpegSource?src_url=http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8&dst_url=rtmp://127.0.0.1/live/hks2&timeout_ms=10000 //测试url http://127.0.0.1/index/api/addFFmpegSource?src_url=http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8&dst_url=rtmp://127.0.0.1/live/hks2&timeout_ms=10000
API_REGIST_INVOKER(api,addFFmpegSource,{ api_regist2("/index/api/addFFmpegSource",[](API_ARGS2){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("src_url","dst_url","timeout_ms"); CHECK_ARGS("src_url","dst_url","timeout_ms");
auto src_url = allArgs["src_url"]; auto src_url = allArgs["src_url"];
@ -690,7 +692,7 @@ void installWebApi() {
}); });
static auto api_delFFmpegSource = [](API_ARGS,const HttpSession::HttpResponseInvoker &invoker){ static auto api_delFFmpegSource = [](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx);
@ -699,24 +701,24 @@ void installWebApi() {
//关闭拉流代理 //关闭拉流代理
//测试url http://127.0.0.1/index/api/delFFmepgSource?key=key //测试url http://127.0.0.1/index/api/delFFmepgSource?key=key
API_REGIST(api,delFFmpegSource,{ api_regist1("/index/api/delFFmpegSource",[](API_ARGS1){
api_delFFmpegSource(API_ARGS_VALUE); api_delFFmpegSource(API_ARGS_VALUE1);
}); });
//此处为了兼容之前的拼写错误 //此处为了兼容之前的拼写错误
API_REGIST(api,delFFmepgSource,{ api_regist1("/index/api/delFFmepgSource",[](API_ARGS1){
api_delFFmpegSource(API_ARGS_VALUE); api_delFFmpegSource(API_ARGS_VALUE1);
}); });
//新增http api下载可执行程序文件接口 //新增http api下载可执行程序文件接口
//测试url http://127.0.0.1/index/api/downloadBin //测试url http://127.0.0.1/index/api/downloadBin
API_REGIST_INVOKER(api,downloadBin,{ api_regist2("/index/api/downloadBin",[](API_ARGS2){
CHECK_SECRET(); CHECK_SECRET();
invoker.responseFile(headerIn,StrCaseMap(),exePath()); invoker.responseFile(headerIn,StrCaseMap(),exePath());
}); });
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
API_REGIST(api,getSsrcInfo,{ api_regist1("/index/api/getSsrcInfo",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("ssrc"); CHECK_ARGS("ssrc");
uint32_t ssrc = 0; uint32_t ssrc = 0;
@ -735,7 +737,7 @@ void installWebApi() {
#endif//ENABLE_RTPPROXY #endif//ENABLE_RTPPROXY
// 开始录制hls或MP4 // 开始录制hls或MP4
API_REGIST(api,startRecord,{ api_regist1("/index/api/startRecord",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream","wait_for_record","continue_record"); CHECK_ARGS("type","vhost","app","stream","wait_for_record","continue_record");
@ -750,7 +752,7 @@ void installWebApi() {
}); });
// 停止录制hls或MP4 // 停止录制hls或MP4
API_REGIST(api,stopRecord,{ api_regist1("/index/api/stopRecord",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream"); CHECK_ARGS("type","vhost","app","stream");
int result = Recorder::stopRecord((Recorder::type)allArgs["type"].as<int>(), int result = Recorder::stopRecord((Recorder::type)allArgs["type"].as<int>(),
@ -761,7 +763,7 @@ void installWebApi() {
}); });
// 获取hls或MP4录制状态 // 获取hls或MP4录制状态
API_REGIST(api,getRecordStatus,{ api_regist1("/index/api/getRecordStatus",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream"); CHECK_ARGS("type","vhost","app","stream");
auto status = Recorder::getRecordStatus((Recorder::type)allArgs["type"].as<int>(), auto status = Recorder::getRecordStatus((Recorder::type)allArgs["type"].as<int>(),
@ -826,7 +828,7 @@ void installWebApi() {
}); });
////////////以下是注册的Hook API//////////// ////////////以下是注册的Hook API////////////
API_REGIST(hook,on_publish,{ api_regist1("/index/hook/on_publish",[](API_ARGS1){
//开始推流事件 //开始推流事件
//转换成rtsp或rtmp //转换成rtsp或rtmp
val["enableRtxp"] = true; val["enableRtxp"] = true;
@ -836,23 +838,23 @@ void installWebApi() {
val["enableMP4"] = false; val["enableMP4"] = false;
}); });
API_REGIST(hook,on_play,{ api_regist1("/index/hook/on_play",[](API_ARGS1){
//开始播放事件 //开始播放事件
throw SuccessException(); throw SuccessException();
}); });
API_REGIST(hook,on_flow_report,{ api_regist1("/index/hook/on_flow_report",[](API_ARGS1){
//流量统计hook api //流量统计hook api
throw SuccessException(); throw SuccessException();
}); });
API_REGIST(hook,on_rtsp_realm,{ api_regist1("/index/hook/on_rtsp_realm",[](API_ARGS1){
//rtsp是否需要鉴权默认需要鉴权 //rtsp是否需要鉴权默认需要鉴权
val["code"] = API::Success; val["code"] = API::Success;
val["realm"] = "zlmediakit_reaml"; val["realm"] = "zlmediakit_reaml";
}); });
API_REGIST(hook,on_rtsp_auth,{ api_regist1("/index/hook/on_rtsp_auth",[](API_ARGS1){
//rtsp鉴权密码密码等于用户名 //rtsp鉴权密码密码等于用户名
//rtsp可以有双重鉴权后面还会触发on_play事件 //rtsp可以有双重鉴权后面还会触发on_play事件
CHECK_ARGS("user_name"); CHECK_ARGS("user_name");
@ -861,14 +863,14 @@ void installWebApi() {
val["passwd"] = allArgs["user_name"].data(); val["passwd"] = allArgs["user_name"].data();
}); });
API_REGIST(hook,on_stream_changed,{ api_regist1("/index/hook/on_stream_changed",[](API_ARGS1){
//媒体注册或反注册事件 //媒体注册或反注册事件
throw SuccessException(); throw SuccessException();
}); });
#if !defined(_WIN32) #if !defined(_WIN32)
API_REGIST_INVOKER(hook,on_stream_not_found_ffmpeg,{ api_regist2("/index/hook/on_stream_not_found_ffmpeg",[](API_ARGS2){
//媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流 //媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("vhost","app","stream"); CHECK_ARGS("vhost","app","stream");
@ -898,7 +900,7 @@ void installWebApi() {
}); });
#endif//!defined(_WIN32) #endif//!defined(_WIN32)
API_REGIST_INVOKER(hook,on_stream_not_found,{ api_regist2("/index/hook/on_stream_not_found",[](API_ARGS2){
//媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流 //媒体未找到事件,我们都及时拉流hks作为替代品目的是为了测试按需拉流
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("vhost","app","stream"); CHECK_ARGS("vhost","app","stream");
@ -924,17 +926,17 @@ void installWebApi() {
}); });
}); });
API_REGIST(hook,on_record_mp4,{ api_regist1("/index/hook/on_record_mp4",[](API_ARGS1){
//录制mp4分片完毕事件 //录制mp4分片完毕事件
throw SuccessException(); throw SuccessException();
}); });
API_REGIST(hook,on_shell_login,{ api_regist1("/index/hook/on_shell_login",[](API_ARGS1){
//shell登录调试事件 //shell登录调试事件
throw SuccessException(); throw SuccessException();
}); });
API_REGIST(hook,on_stream_none_reader,{ api_regist1("/index/hook/on_stream_none_reader",[](API_ARGS1){
//无人观看流默认关闭 //无人观看流默认关闭
val["close"] = true; val["close"] = true;
}); });
@ -944,7 +946,7 @@ void installWebApi() {
return true; return true;
}; };
API_REGIST(hook,on_http_access,{ api_regist1("/index/hook/on_http_access",[](API_ARGS1){
//在这里根据allArgs["params"](url参数)来判断该http客户端是否有权限访问该文件 //在这里根据allArgs["params"](url参数)来判断该http客户端是否有权限访问该文件
if(!checkAccess(allArgs["params"])){ if(!checkAccess(allArgs["params"])){
//无访问权限 //无访问权限
@ -965,7 +967,7 @@ void installWebApi() {
}); });
API_REGIST(hook,on_server_started,{ api_regist1("/index/hook/on_server_started",[](API_ARGS1){
//服务器重启报告 //服务器重启报告
throw SuccessException(); throw SuccessException();
}); });

View File

@ -76,17 +76,24 @@ AACRtmpEncoder::AACRtmpEncoder(const Track::Ptr &track) {
_track = dynamic_pointer_cast<AACTrack>(track); _track = dynamic_pointer_cast<AACTrack>(track);
} }
void AACRtmpEncoder::makeConfigPacket() {
if (_track && _track->ready()) {
//从track中和获取aac配置信息
_aac_cfg = _track->getAacCfg();
}
if (!_aac_cfg.empty()) {
makeAudioConfigPkt();
}
}
void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
if(_aac_cfg.empty()){ if (_aac_cfg.empty()) {
if(frame->prefixSize() >= 7){ if (frame->prefixSize() >= 7) {
//包含adts头,从adts头获取aac配置信息 //包含adts头,从adts头获取aac配置信息
_aac_cfg = makeAdtsConfig(reinterpret_cast<const uint8_t *>(frame->data())); _aac_cfg = makeAdtsConfig(reinterpret_cast<const uint8_t *>(frame->data()));
makeAudioConfigPkt();
} else if(_track && _track->ready()){
//从track中和获取aac配置信息
_aac_cfg = _track->getAacCfg();
makeAudioConfigPkt();
} }
makeConfigPacket();
} }
if(!_aac_cfg.empty()){ if(!_aac_cfg.empty()){

View File

@ -88,6 +88,10 @@ public:
*/ */
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
/**
* config包
*/
void makeConfigPacket() override;
private: private:
void makeAudioConfigPkt(); void makeAudioConfigPkt();
private: private:

View File

@ -99,7 +99,20 @@ inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dt
H264RtmpEncoder::H264RtmpEncoder(const Track::Ptr &track) { H264RtmpEncoder::H264RtmpEncoder(const Track::Ptr &track) {
_track = dynamic_pointer_cast<H264Track>(track); _track = dynamic_pointer_cast<H264Track>(track);
}
void H264RtmpEncoder::makeConfigPacket(){
if (_track && _track->ready()) {
//尝试从track中获取sps pps信息
_sps = _track->getSps();
_pps = _track->getPps();
}
if (!_sps.empty() && !_pps.empty()) {
//获取到sps/pps
makeVideoConfigPkt();
_gotSpsPps = true;
}
} }
void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
@ -107,37 +120,24 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
auto iLen = frame->size() - frame->prefixSize(); auto iLen = frame->size() - frame->prefixSize();
auto type = H264_TYPE(((uint8_t*)pcData)[0]); auto type = H264_TYPE(((uint8_t*)pcData)[0]);
if(!_gotSpsPps){ if (!_gotSpsPps) {
//尝试从frame中获取sps pps //尝试从frame中获取sps pps
switch (type){ switch (type) {
case H264Frame::NAL_SPS:{ case H264Frame::NAL_SPS: {
//sps //sps
if(_sps.empty()){ _sps = string(pcData, iLen);
_sps = string(pcData,iLen); makeConfigPacket();
}
}
break; break;
case H264Frame::NAL_PPS:{ }
case H264Frame::NAL_PPS: {
//pps //pps
if(_pps.empty()){ _pps = string(pcData, iLen);
_pps = string(pcData,iLen); makeConfigPacket();
}
}
break; break;
}
default: default:
break; break;
} }
if(_track && _track->ready()){
//尝试从track中获取sps pps信息
_sps = _track->getSps();
_pps = _track->getPps();
}
if(!_sps.empty() && !_pps.empty()){
_gotSpsPps = true;
makeVideoConfigPkt();
}
} }
if(type == H264Frame::NAL_SEI){ if(type == H264Frame::NAL_SEI){

View File

@ -90,6 +90,11 @@ public:
* @param frame * @param frame
*/ */
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
/**
* config包
*/
void makeConfigPacket() override;
private: private:
void makeVideoConfigPkt(); void makeVideoConfigPkt();
private: private:

View File

@ -189,9 +189,8 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
//该帧最后一个rtp包 FU-A end //该帧最后一个rtp包 FU-A end
_h264frame->_buffer.append((char *)frame + 2, length - 2); _h264frame->_buffer.append((char *)frame + 2, length - 2);
_h264frame->_pts = rtppack->timeStamp; _h264frame->_pts = rtppack->timeStamp;
auto key = _h264frame->keyFrame();
onGetH264(_h264frame); onGetH264(_h264frame);
return key; return false;
} }
default:{ default:{
@ -209,8 +208,15 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
} }
void H264RtpDecoder::onGetH264(const H264Frame::Ptr &frame) { void H264RtpDecoder::onGetH264(const H264Frame::Ptr &frame) {
//根据pts计算dts
auto flag = _dts_generator.getDts(frame->_pts,frame->_dts); auto flag = _dts_generator.getDts(frame->_pts,frame->_dts);
if(!flag){
if(frame->configFrame() || frame->keyFrame()){
flag = true;
frame->_dts = frame->_pts;
}
}
//根据pts计算dts
if(flag){ if(flag){
//写入环形缓存 //写入环形缓存
RtpCodec::inputFrame(frame); RtpCodec::inputFrame(frame);

View File

@ -127,9 +127,8 @@ bool H265RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
//该帧最后一个rtp包 //该帧最后一个rtp包
_h265frame->_buffer.append((char *) frame + 3, length - 3); _h265frame->_buffer.append((char *) frame + 3, length - 3);
_h265frame->_pts = rtppack->timeStamp; _h265frame->_pts = rtppack->timeStamp;
auto key = _h265frame->keyFrame();
onGetH265(_h265frame); onGetH265(_h265frame);
return key; return false;
} }
default: // 4.4.1. Single NAL Unit Packets (p24) default: // 4.4.1. Single NAL Unit Packets (p24)
@ -146,6 +145,12 @@ bool H265RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
void H265RtpDecoder::onGetH265(const H265Frame::Ptr &frame) { void H265RtpDecoder::onGetH265(const H265Frame::Ptr &frame) {
//计算dts //计算dts
auto flag = _dts_generator.getDts(frame->_pts,frame->_dts); auto flag = _dts_generator.getDts(frame->_pts,frame->_dts);
if(!flag){
if(frame->configFrame() || frame->keyFrame()){
flag = true;
frame->_dts = frame->_pts;
}
}
if(flag){ if(flag){
//写入环形缓存 //写入环形缓存
RtpCodec::inputFrame(frame); RtpCodec::inputFrame(frame);

View File

@ -30,16 +30,80 @@
#include "HttpSession.h" #include "HttpSession.h"
#include "Network/TcpServer.h" #include "Network/TcpServer.h"
/**
*
*/
class SendInterceptor{
public:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
SendInterceptor() = default;
virtual ~SendInterceptor() = default;
virtual void setOnBeforeSendCB(const onBeforeSendCB &cb) = 0;
};
/**
* TcpSession派生类发送数据的截取
* websocket协议的打包
*/
template <typename TcpSessionType>
class TcpSessionTypeImp : public TcpSessionType, public SendInterceptor{
public:
typedef std::shared_ptr<TcpSessionTypeImp> Ptr;
TcpSessionTypeImp(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock) :
_identifier(parent.getIdentifier()), TcpSessionType(pSock) {}
~TcpSessionTypeImp() {}
/**
*
* @param cb
*/
void setOnBeforeSendCB(const onBeforeSendCB &cb) override {
_beforeSendCB = cb;
}
protected:
/**
* send函数截取数据
* @param buf
* @return
*/
int send(const Buffer::Ptr &buf) override {
if (_beforeSendCB) {
return _beforeSendCB(buf);
}
return TcpSessionType::send(buf);
}
string getIdentifier() const override {
return _identifier;
}
private:
onBeforeSendCB _beforeSendCB;
string _identifier;
};
template <typename TcpSessionType>
class TcpSessionCreator {
public:
//返回的TcpSession必须派生于SendInterceptor可以返回null
TcpSession::Ptr operator()(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock){
return std::make_shared<TcpSessionTypeImp<TcpSessionType> >(header,parent,pSock);
}
};
/** /**
* WebSocket协议 * WebSocket协议
* WebSock协议下的具体业务协议WebSocket协议的Rtmp协议等 * WebSock协议下的具体业务协议WebSocket协议的Rtmp协议等
* @tparam SessionType TcpSession类
*/ */
template <class SessionType,class HttpSessionType = HttpSession,WebSocketHeader::Type DataType = WebSocketHeader::TEXT> template<typename Creator, typename HttpSessionType = HttpSession, WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSession : public HttpSessionType { class WebSocketSessionBase : public HttpSessionType {
public: public:
WebSocketSession(const Socket::Ptr &pSock) : HttpSessionType(pSock){} WebSocketSessionBase(const Socket::Ptr &pSock) : HttpSessionType(pSock){}
virtual ~WebSocketSession(){} virtual ~WebSocketSessionBase(){}
//收到eof或其他导致脱离TcpServer事件的回调 //收到eof或其他导致脱离TcpServer事件的回调
void onError(const SockException &err) override{ void onError(const SockException &err) override{
@ -69,23 +133,27 @@ protected:
*/ */
bool onWebSocketConnect(const Parser &header) override{ bool onWebSocketConnect(const Parser &header) override{
//创建websocket session类 //创建websocket session类
_session = std::make_shared<SessionImp>(HttpSessionType::getIdentifier(),HttpSessionType::_sock); _session = _creator(header, *this,HttpSessionType::_sock);
if(!_session){
//此url不允许创建websocket连接
return false;
}
auto strongServer = _weakServer.lock(); auto strongServer = _weakServer.lock();
if(strongServer){ if(strongServer){
_session->attachServer(*strongServer); _session->attachServer(*strongServer);
} }
//此处截取数据并进行websocket协议打包 //此处截取数据并进行websocket协议打包
weak_ptr<WebSocketSession> weakSelf = dynamic_pointer_cast<WebSocketSession>(HttpSessionType::shared_from_this()); weak_ptr<WebSocketSessionBase> weakSelf = dynamic_pointer_cast<WebSocketSessionBase>(HttpSessionType::shared_from_this());
_session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ dynamic_pointer_cast<SendInterceptor>(_session)->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(strongSelf){ if (strongSelf) {
WebSocketHeader header; WebSocketHeader header;
header._fin = true; header._fin = true;
header._reserved = 0; header._reserved = 0;
header._opcode = DataType; header._opcode = DataType;
header._mask_flag = false; header._mask_flag = false;
strongSelf->WebSocketSplitter::encode(header,buf); strongSelf->WebSocketSplitter::encode(header, buf);
} }
return buf->size(); return buf->size();
}); });
@ -155,50 +223,19 @@ protected:
void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{ void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
SocketHelper::send(buffer); SocketHelper::send(buffer);
} }
private:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
/**
* TcpSession派生类发送数据的截取
* websocket协议的打包
*/
class SessionImp : public SessionType{
public:
SessionImp(const string &identifier,const Socket::Ptr &pSock) :
_identifier(identifier),SessionType(pSock){}
~SessionImp(){}
/**
*
* @param cb
*/
void setOnBeforeSendCB(const onBeforeSendCB &cb){
_beforeSendCB = cb;
}
protected:
/**
* send函数截取数据
* @param buf
* @return
*/
int send(const Buffer::Ptr &buf) override {
if(_beforeSendCB){
return _beforeSendCB(buf);
}
return SessionType::send(buf);
}
string getIdentifier() const override{
return _identifier;
}
private:
onBeforeSendCB _beforeSendCB;
string _identifier;
};
private: private:
string _remian_data; string _remian_data;
weak_ptr<TcpServer> _weakServer; weak_ptr<TcpServer> _weakServer;
std::shared_ptr<SessionImp> _session; TcpSession::Ptr _session;
Creator _creator;
}; };
template<typename TcpSessionType,typename HttpSessionType = HttpSession,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSession : public WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>{
public:
WebSocketSession(const Socket::Ptr &pSock) : WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>(pSock){}
virtual ~WebSocketSession(){}
};
#endif //ZLMEDIAKIT_WEBSOCKETSESSION_H #endif //ZLMEDIAKIT_WEBSOCKETSESSION_H

View File

@ -80,6 +80,7 @@ public:
typedef std::shared_ptr<RtmpCodec> Ptr; typedef std::shared_ptr<RtmpCodec> Ptr;
RtmpCodec(){} RtmpCodec(){}
virtual ~RtmpCodec(){} virtual ~RtmpCodec(){}
virtual void makeConfigPacket() {};
}; };

View File

@ -142,7 +142,11 @@ public:
} }
strongSelf->onReaderChanged(size); strongSelf->onReaderChanged(size);
}; };
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
//rtmp包缓存最大允许512个如果是纯视频(25fps)大概为20秒数据
//但是这个是GOP缓存的上限值真实的GOP缓存大小等于两个I帧之间的包数的两倍
//而且每次遇到I帧则会清空GOP缓存所以真实的GOP缓存远小于此值
_ring = std::make_shared<RingType>(_ring_size,512,std::move(lam));
onReaderChanged(0); onReaderChanged(0);
if(_metadata){ if(_metadata){

View File

@ -58,6 +58,7 @@ public:
} }
void onAllTrackReady(){ void onAllTrackReady(){
makeConfigPacket();
_mediaSouce->setMetaData(getMetadata()); _mediaSouce->setMetaData(getMetadata());
} }

View File

@ -78,6 +78,14 @@ void RtmpMuxer::inputFrame(const Frame::Ptr &frame) {
} }
} }
void RtmpMuxer::makeConfigPacket(){
for(auto &encoder : _encoder){
if(encoder){
encoder->makeConfigPacket();
}
}
}
const AMFValue &RtmpMuxer::getMetadata() const { const AMFValue &RtmpMuxer::getMetadata() const {
return _metadata; return _metadata;
} }

View File

@ -71,6 +71,11 @@ public:
* track * track
*/ */
void resetTracks() override ; void resetTracks() override ;
/**
* config包
*/
void makeConfigPacket();
private: private:
RtmpRing::RingType::Ptr _rtmpRing; RtmpRing::RingType::Ptr _rtmpRing;
AMFValue _metadata; AMFValue _metadata;

View File

@ -85,74 +85,84 @@ string SdpTrack::toString() const {
} }
return _printer; return _printer;
} }
void SdpParser::load(const string &sdp) {
_track_map.clear();
string key;
SdpTrack::Ptr track = std::make_shared<SdpTrack>();
auto lines = split(sdp,"\n"); static TrackType toTrackType(const string &str) {
for (auto &line : lines){ if (str == "") {
trim(line); return TrackTitle;
if(line.size() < 2 || line[1] != '='){
continue;
}
char opt = line[0];
string opt_val = line.substr(2);
switch (opt){
case 'o':
track->_o = opt_val;
break;
case 's':
track->_s = opt_val;
break;
case 'i':
track->_i = opt_val;
break;
case 'c':
track->_c = opt_val;
break;
case 't':
track->_t = opt_val;
break;
case 'b':
track->_b = opt_val;
break;
case 'm':{
_track_map[key] = track;
track = std::make_shared<SdpTrack>();
key = FindField(opt_val.data(), nullptr," ");
track->_m = opt_val;
}
break;
case 'a':{
string attr = FindField(opt_val.data(), nullptr,":");
if(attr.empty()){
track->_attr[opt_val] = "";
}else{
track->_attr[attr] = FindField(opt_val.data(),":", nullptr);
}
}
break;
default:
track->_other[opt] = opt_val;
break;
}
} }
_track_map[key] = track;
if (str == "video") {
return TrackVideo;
}
for (auto &pr : _track_map) { if (str == "audio") {
auto &track = *pr.second; return TrackAudio;
if (pr.first == "") { }
track._type = TrackTitle;
} else if (pr.first == "video") { return TrackInvalid;
track._type = TrackVideo; }
} else if (pr.first == "audio") {
track._type = TrackAudio; void SdpParser::load(const string &sdp) {
} else { {
track._type = TrackInvalid; _track_vec.clear();
string key;
SdpTrack::Ptr track = std::make_shared<SdpTrack>();
auto lines = split(sdp, "\n");
for (auto &line : lines) {
trim(line);
if (line.size() < 2 || line[1] != '=') {
continue;
}
char opt = line[0];
string opt_val = line.substr(2);
switch (opt) {
case 'o':
track->_o = opt_val;
break;
case 's':
track->_s = opt_val;
break;
case 'i':
track->_i = opt_val;
break;
case 'c':
track->_c = opt_val;
break;
case 't':
track->_t = opt_val;
break;
case 'b':
track->_b = opt_val;
break;
case 'm': {
track->_type = toTrackType(key);
_track_vec.emplace_back(track);
track = std::make_shared<SdpTrack>();
key = FindField(opt_val.data(), nullptr, " ");
track->_m = opt_val;
}
break;
case 'a': {
string attr = FindField(opt_val.data(), nullptr, ":");
if (attr.empty()) {
track->_attr[opt_val] = "";
} else {
track->_attr[attr] = FindField(opt_val.data(), ":", nullptr);
}
}
break;
default:
track->_other[opt] = opt_val;
break;
}
} }
track->_type = toTrackType(key);
_track_vec.emplace_back(track);
}
for (auto &track_ptr : _track_vec) {
auto &track = *track_ptr;
auto it = track._attr.find("range"); auto it = track._attr.find("range");
if (it != track._attr.end()) { if (it != track._attr.end()) {
char name[16] = {0}, start[16] = {0}, end[16] = {0}; char name[16] = {0}, start[16] = {0}, end[16] = {0};
@ -198,9 +208,9 @@ bool SdpParser::available() const {
} }
SdpTrack::Ptr SdpParser::getTrack(TrackType type) const { SdpTrack::Ptr SdpParser::getTrack(TrackType type) const {
for (auto &pr : _track_map){ for (auto &track : _track_vec){
if(pr.second->_type == type){ if(track->_type == type){
return pr.second; return track;
} }
} }
return nullptr; return nullptr;
@ -208,31 +218,42 @@ SdpTrack::Ptr SdpParser::getTrack(TrackType type) const {
vector<SdpTrack::Ptr> SdpParser::getAvailableTrack() const { vector<SdpTrack::Ptr> SdpParser::getAvailableTrack() const {
vector<SdpTrack::Ptr> ret; vector<SdpTrack::Ptr> ret;
auto video = getTrack(TrackVideo); bool audio_added = false;
if(video){ bool video_added = false;
ret.emplace_back(video); for (auto &track : _track_vec){
if(track->_type == TrackAudio ){
if(!audio_added){
ret.emplace_back(track);
audio_added = true;
}
continue;
}
if(track->_type == TrackVideo ){
if(!video_added){
ret.emplace_back(track);
video_added = true;
}
continue;
}
} }
auto audio = getTrack(TrackAudio); return std::move(ret);
if(audio){
ret.emplace_back(audio);
}
return ret;
} }
string SdpParser::toString() const { string SdpParser::toString() const {
string title,audio,video; string title,audio,video;
for(auto &pr : _track_map){ for(auto &track : _track_vec){
switch (pr.second->_type){ switch (track->_type){
case TrackTitle:{ case TrackTitle:{
title = pr.second->toString(); title = track->toString();
} }
break; break;
case TrackVideo:{ case TrackVideo:{
video = pr.second->toString(); video = track->toString();
} }
break; break;
case TrackAudio:{ case TrackAudio:{
audio = pr.second->toString(); audio = track->toString();
} }
break; break;
default: default:

View File

@ -122,7 +122,7 @@ public:
vector<SdpTrack::Ptr> getAvailableTrack() const; vector<SdpTrack::Ptr> getAvailableTrack() const;
string toString() const ; string toString() const ;
private: private:
map<string, SdpTrack::Ptr> _track_map; vector<SdpTrack::Ptr> _track_vec;
}; };
/** /**

View File

@ -179,7 +179,10 @@ public:
} }
strongSelf->onReaderChanged(size); strongSelf->onReaderChanged(size);
}; };
_ring = std::make_shared<RingType>(_ring_size, std::move(lam)); //rtp包缓存最大允许2048个大概最多3MB数据
//但是这个是GOP缓存的上限值真实的GOP缓存大小等于两个I帧之间的包数的两倍
//而且每次遇到I帧则会清空GOP缓存所以真实的GOP缓存远小于此值
_ring = std::make_shared<RingType>(_ring_size,2048, std::move(lam));
onReaderChanged(0); onReaderChanged(0);
if (!_sdp.empty()) { if (!_sdp.empty()) {
regist(); regist();

View File

@ -51,6 +51,7 @@ public:
} }
void onRecv(const Buffer::Ptr &buffer) override { void onRecv(const Buffer::Ptr &buffer) override {
//回显数据 //回显数据
send("from EchoSession:");
send(buffer); send(buffer);
} }
void onError(const SockException &err) override{ void onError(const SockException &err) override{
@ -62,6 +63,48 @@ public:
} }
}; };
class EchoSessionWithUrl : public TcpSession {
public:
EchoSessionWithUrl(const Socket::Ptr &pSock) : TcpSession(pSock){
DebugL;
}
virtual ~EchoSessionWithUrl(){
DebugL;
}
void attachServer(const TcpServer &server) override{
DebugL << getIdentifier() << " " << TcpSession::getIdentifier();
}
void onRecv(const Buffer::Ptr &buffer) override {
//回显数据
send("from EchoSessionWithUrl:");
send(buffer);
}
void onError(const SockException &err) override{
WarnL << err.what();
}
//每隔一段时间触发,用来做超时管理
void onManager() override{
DebugL;
}
};
/**
* websocket 访url选择创建不同的对象
*/
struct EchoSessionCreator {
//返回的TcpSession必须派生于SendInterceptor可以返回null(拒绝连接)
TcpSession::Ptr operator()(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock) {
// return nullptr;
if (header.Url() == "/") {
return std::make_shared<TcpSessionTypeImp<EchoSession> >(header, parent, pSock);
}
return std::make_shared<TcpSessionTypeImp<EchoSessionWithUrl> >(header, parent, pSock);
}
};
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>()); Logger::Instance().add(std::make_shared<ConsoleChannel>());
@ -71,13 +114,19 @@ int main(int argc, char *argv[]) {
TcpServer::Ptr httpSrv(new TcpServer()); TcpServer::Ptr httpSrv(new TcpServer());
//http服务器,支持websocket //http服务器,支持websocket
httpSrv->start<WebSocketSession<EchoSession,HttpSession>>(80);//默认80 httpSrv->start<WebSocketSessionBase<EchoSessionCreator,HttpSession> >(80);//默认80
TcpServer::Ptr httpsSrv(new TcpServer()); TcpServer::Ptr httpsSrv(new TcpServer());
//https服务器,支持websocket //https服务器,支持websocket
httpsSrv->start<WebSocketSession<EchoSession,HttpsSession>>(443);//默认443 httpsSrv->start<WebSocketSessionBase<EchoSessionCreator,HttpsSession> >(443);//默认443
TcpServer::Ptr httpSrvOld(new TcpServer());
//兼容之前的代码(但是不支持根据url选择生成TcpSession类型)
httpSrvOld->start<WebSocketSession<EchoSession,HttpSession> >(8080);
DebugL << "请打开网页:http://www.websocket-test.com/,进行测试";
DebugL << "连接 ws://127.0.0.1/xxxxws://127.0.0.1/ 测试的效果将不同支持根据url选择不同的处理逻辑";
DebugL << "请打开网页:http://www.websocket-test.com/,连接 ws://127.0.0.1/测试";
//设置退出信号处理函数 //设置退出信号处理函数
static semaphore sem; static semaphore sem;