mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-10-30 16:27:36 +08:00
新增GB28181 tcp passive被动发送接口(startSendRtpPassive)
This commit is contained in:
parent
4f768cacf1
commit
8231c5c293
@ -1050,7 +1050,7 @@
|
||||
"method": "GET",
|
||||
"header": [],
|
||||
"url": {
|
||||
"raw": "{{ZLMediaKit_URL}}/index/api/startRecord?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&stamp=1000",
|
||||
"raw": "{{ZLMediaKit_URL}}/index/api/seekRecordStamp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&stamp",
|
||||
"host": [
|
||||
"{{ZLMediaKit_URL}}"
|
||||
],
|
||||
@ -1517,6 +1517,82 @@
|
||||
},
|
||||
"response": []
|
||||
},
|
||||
{
|
||||
"name": "开始tcp passive被动发送rtp(startSendRtpPassive)",
|
||||
"request": {
|
||||
"method": "GET",
|
||||
"header": [],
|
||||
"url": {
|
||||
"raw": "{{ZLMediaKit_URL}}/index/api/startSendRtpPassive?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=test&ssrc=1",
|
||||
"host": [
|
||||
"{{ZLMediaKit_URL}}"
|
||||
],
|
||||
"path": [
|
||||
"index",
|
||||
"api",
|
||||
"startSendRtpPassive"
|
||||
],
|
||||
"query": [
|
||||
{
|
||||
"key": "secret",
|
||||
"value": "{{ZLMediaKit_secret}}",
|
||||
"description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数"
|
||||
},
|
||||
{
|
||||
"key": "vhost",
|
||||
"value": "{{defaultVhost}}",
|
||||
"description": "虚拟主机,例如__defaultVhost__"
|
||||
},
|
||||
{
|
||||
"key": "app",
|
||||
"value": "live",
|
||||
"description": "应用名,例如 live"
|
||||
},
|
||||
{
|
||||
"key": "stream",
|
||||
"value": "test",
|
||||
"description": "流id,例如 obs"
|
||||
},
|
||||
{
|
||||
"key": "ssrc",
|
||||
"value": "1",
|
||||
"description": "rtp推流的ssrc,ssrc不同时,可以推流到多个上级服务器"
|
||||
},
|
||||
{
|
||||
"key": "src_port",
|
||||
"value": "0",
|
||||
"description": "指定tcp/udp客户端使用的本地端口,0时为随机端口,该参数非必选参数,不传时为随机端口。",
|
||||
"disabled": true
|
||||
},
|
||||
{
|
||||
"key": "from_mp4",
|
||||
"value": "0",
|
||||
"description": "是否推送本地MP4录像,该参数非必选参数",
|
||||
"disabled": true
|
||||
},
|
||||
{
|
||||
"key": "use_ps",
|
||||
"value": "1",
|
||||
"description": "rtp打包采用ps还是es模式,默认采用ps模式,该参数非必选参数",
|
||||
"disabled": true
|
||||
},
|
||||
{
|
||||
"key": "pt",
|
||||
"value": "96",
|
||||
"description": "rtp payload type,默认96,该参数非必选参数",
|
||||
"disabled": true
|
||||
},
|
||||
{
|
||||
"key": "only_audio",
|
||||
"value": "1",
|
||||
"description": "rtp es方式打包时,是否只打包音频,该参数非必选参数",
|
||||
"disabled": true
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"response": []
|
||||
},
|
||||
{
|
||||
"name": "停止 发送rtp(stopSendRtp)",
|
||||
"request": {
|
||||
|
@ -1107,6 +1107,7 @@ void installWebApi() {
|
||||
}
|
||||
|
||||
MediaSourceEvent::SendRtpArgs args;
|
||||
args.passive = false;
|
||||
args.dst_url = allArgs["dst_url"];
|
||||
args.dst_port = allArgs["dst_port"];
|
||||
args.ssrc = allArgs["ssrc"];
|
||||
@ -1115,7 +1116,7 @@ void installWebApi() {
|
||||
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
|
||||
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
|
||||
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>();
|
||||
TraceL << "pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
|
||||
TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
|
||||
|
||||
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
|
||||
if (ex) {
|
||||
@ -1127,6 +1128,34 @@ void installWebApi() {
|
||||
});
|
||||
});
|
||||
|
||||
api_regist("/index/api/startSendRtpPassive",[](API_ARGS_MAP_ASYNC){
|
||||
CHECK_SECRET();
|
||||
CHECK_ARGS("vhost", "app", "stream", "ssrc");
|
||||
|
||||
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as<int>());
|
||||
if (!src) {
|
||||
throw ApiRetException("该媒体流不存在", API::OtherFailed);
|
||||
}
|
||||
|
||||
MediaSourceEvent::SendRtpArgs args;
|
||||
args.passive = true;
|
||||
args.ssrc = allArgs["ssrc"];
|
||||
args.is_udp = false;
|
||||
args.src_port = allArgs["src_port"];
|
||||
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
|
||||
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
|
||||
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>();
|
||||
TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
|
||||
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
|
||||
if (ex) {
|
||||
val["code"] = API::OtherFailed;
|
||||
val["msg"] = ex.what();
|
||||
}
|
||||
val["local_port"] = local_port;
|
||||
invoker(200, headerOut, val.toStyledString());
|
||||
});
|
||||
});
|
||||
|
||||
api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP){
|
||||
CHECK_SECRET();
|
||||
CHECK_ARGS("vhost", "app", "stream");
|
||||
|
@ -94,6 +94,8 @@ public:
|
||||
bool use_ps = true;
|
||||
//发送es流时指定是否只发送纯音频流
|
||||
bool only_audio = true;
|
||||
//tcp被动方式
|
||||
bool passive = false;
|
||||
// rtp payload type
|
||||
uint8_t pt = 96;
|
||||
// 指定rtp ssrc
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "RtpSender.h"
|
||||
#include "Rtsp/RtspSession.h"
|
||||
#include "Thread/WorkThreadPool.h"
|
||||
#include "Util/uv_errno.h"
|
||||
#include "RtpCache.h"
|
||||
|
||||
using namespace std;
|
||||
@ -19,17 +20,65 @@ using namespace toolkit;
|
||||
|
||||
namespace mediakit{
|
||||
|
||||
RtpSender::RtpSender() {
|
||||
_poller = EventPollerPool::Instance().getPoller();
|
||||
_socket = Socket::createSocket(_poller, false);
|
||||
}
|
||||
|
||||
void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const function<void(uint16_t local_port, const SockException &ex)> &cb){
|
||||
_args = args;
|
||||
_poller = EventPollerPool::Instance().getPoller();
|
||||
auto lam = [this](std::shared_ptr<List<Buffer::Ptr>> list) { onFlushRtpList(std::move(list)); };
|
||||
if (args.use_ps) {
|
||||
_interface = std::make_shared<RtpCachePS>(lam, atoi(args.ssrc.data()), args.pt);
|
||||
} else {
|
||||
_interface = std::make_shared<RtpCacheRaw>(lam, atoi(args.ssrc.data()), args.pt, args.only_audio);
|
||||
if (!_interface) {
|
||||
//重连时不重新创建对象
|
||||
auto lam = [this](std::shared_ptr<List<Buffer::Ptr>> list) { onFlushRtpList(std::move(list)); };
|
||||
if (args.use_ps) {
|
||||
_interface = std::make_shared<RtpCachePS>(lam, atoi(args.ssrc.data()), args.pt);
|
||||
} else {
|
||||
_interface = std::make_shared<RtpCacheRaw>(lam, atoi(args.ssrc.data()), args.pt, args.only_audio);
|
||||
}
|
||||
}
|
||||
_socket = Socket::createSocket(_poller, false);
|
||||
|
||||
weak_ptr<RtpSender> weak_self = shared_from_this();
|
||||
if (args.passive) {
|
||||
// tcp被动发流模式
|
||||
_args.is_udp = false;
|
||||
try {
|
||||
auto tcp_listener = Socket::createSocket(_poller, false);
|
||||
if (args.src_port) {
|
||||
//指定端口
|
||||
if (!tcp_listener->listen(args.src_port)) {
|
||||
throw std::invalid_argument(StrPrinter << "open tcp passive server failed on port:" << args.src_port
|
||||
<< ", err:" << get_uv_errmsg(true));
|
||||
}
|
||||
} else {
|
||||
auto pr = std::make_pair(tcp_listener, Socket::createSocket(_poller, false));
|
||||
//从端口池获取随机端口
|
||||
makeSockPair(pr, "::", false, false);
|
||||
}
|
||||
// tcp服务器默认开启5秒
|
||||
auto delay_task = _poller->doDelayTask(5 * 1000, [tcp_listener, cb]() mutable {
|
||||
cb(0, SockException(Err_timeout, "wait tcp connection timeout"));
|
||||
tcp_listener = nullptr;
|
||||
return 0;
|
||||
});
|
||||
tcp_listener->setOnAccept([weak_self, cb, delay_task](Socket::Ptr &sock, std::shared_ptr<void> &complete) {
|
||||
auto strong_self = weak_self.lock();
|
||||
if (!strong_self) {
|
||||
return;
|
||||
}
|
||||
//立即关闭tcp服务器
|
||||
delay_task->cancel();
|
||||
strong_self->_socket = sock;
|
||||
strong_self->onConnect();
|
||||
cb(sock->get_local_port(), SockException());
|
||||
InfoL << "accept connection from:" << sock->get_peer_ip() << ":" << sock->get_peer_port();
|
||||
});
|
||||
InfoL << "start tcp passive server on:" << tcp_listener->get_local_port();
|
||||
} catch (std::exception &ex) {
|
||||
cb(0, SockException(Err_other, ex.what()));
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (args.is_udp) {
|
||||
auto poller = _poller;
|
||||
WorkThreadPool::Instance().getPoller()->async([cb, args, weak_self, poller]() {
|
||||
@ -54,12 +103,14 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct
|
||||
try {
|
||||
if (args.src_port) {
|
||||
//指定端口
|
||||
strong_self->_socket->bindUdpSock(args.src_port, ifr_ip);
|
||||
if (!strong_self->_socket->bindUdpSock(args.src_port, ifr_ip)) {
|
||||
throw std::invalid_argument(StrPrinter << "bindUdpSock failed on port:" << args.src_port
|
||||
<< ", err:" << get_uv_errmsg(true));
|
||||
}
|
||||
} else {
|
||||
auto pr = std::make_pair(std::move(strong_self->_socket), Socket::createSocket(strong_self->_poller, false));
|
||||
auto pr = std::make_pair(strong_self->_socket, Socket::createSocket(strong_self->_poller, false));
|
||||
//从端口池获取随机端口
|
||||
makeSockPair(pr, ifr_ip, true);
|
||||
strong_self->_socket = std::move(pr.first);
|
||||
}
|
||||
} catch (std::exception &ex) {
|
||||
cb(0, SockException(Err_other, ex.what()));
|
||||
@ -82,7 +133,7 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct
|
||||
} else {
|
||||
cb(0, err);
|
||||
}
|
||||
}, 5.0F, "::", args.src_port);
|
||||
}, 5.0F, "::", args.src_port);
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,11 +204,15 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
|
||||
void RtpSender::onErr(const SockException &ex, bool is_connect) {
|
||||
_is_connect = false;
|
||||
|
||||
//监听socket断开事件,方便重连
|
||||
if (is_connect) {
|
||||
WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what();
|
||||
if (_args.passive) {
|
||||
WarnL << "tcp passive connection lost: " << ex.what();
|
||||
} else {
|
||||
WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what();
|
||||
//监听socket断开事件,方便重连
|
||||
if (is_connect) {
|
||||
WarnL << "重连" << _args.dst_url << ":" << _args.dst_port << "失败, 原因为:" << ex.what();
|
||||
} else {
|
||||
WarnL << "停止发送 rtp:" << _args.dst_url << ":" << _args.dst_port << ", 原因为:" << ex.what();
|
||||
}
|
||||
}
|
||||
|
||||
weak_ptr<RtpSender> weak_self = shared_from_this();
|
||||
|
@ -21,7 +21,7 @@ class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this
|
||||
public:
|
||||
typedef std::shared_ptr<RtpSender> Ptr;
|
||||
|
||||
RtpSender() = default;
|
||||
RtpSender();
|
||||
~RtpSender() override = default;
|
||||
|
||||
/**
|
||||
|
@ -312,7 +312,8 @@ string SdpParser::toString() const {
|
||||
return title + video + audio;
|
||||
}
|
||||
|
||||
class PortManager : public std::enable_shared_from_this<PortManager> {
|
||||
template<int type>
|
||||
class PortManager : public std::enable_shared_from_this<PortManager<type> > {
|
||||
public:
|
||||
PortManager() {
|
||||
static auto func = [](const string &str, int index) {
|
||||
@ -331,28 +332,44 @@ public:
|
||||
return *instance;
|
||||
}
|
||||
|
||||
void bindUdpSock(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip, bool re_use_port) {
|
||||
void makeSockPair(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip, bool re_use_port, bool is_udp) {
|
||||
auto &sock0 = pair.first;
|
||||
auto &sock1 = pair.second;
|
||||
auto sock_pair = getPortPair();
|
||||
if (!sock_pair) {
|
||||
throw runtime_error("none reserved udp port in pool");
|
||||
throw runtime_error("none reserved port in pool");
|
||||
}
|
||||
if (is_udp) {
|
||||
if (!sock0->bindUdpSock(2 * *sock_pair, local_ip.data(), re_use_port)) {
|
||||
//分配端口失败
|
||||
throw runtime_error("open udp socket[0] failed");
|
||||
}
|
||||
|
||||
if (!sock0->bindUdpSock(2 * *sock_pair, local_ip.data(), re_use_port)) {
|
||||
//分配端口失败
|
||||
throw runtime_error("open udp socket[0] failed");
|
||||
if (!sock1->bindUdpSock(2 * *sock_pair + 1, local_ip.data(), re_use_port)) {
|
||||
//分配端口失败
|
||||
throw runtime_error("open udp socket[1] failed");
|
||||
}
|
||||
|
||||
auto on_cycle = [sock_pair](Socket::Ptr &, std::shared_ptr<void> &) {};
|
||||
// udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象
|
||||
sock0->setOnAccept(on_cycle);
|
||||
sock1->setOnAccept(on_cycle);
|
||||
} else {
|
||||
if (!sock0->listen(2 * *sock_pair, local_ip.data())) {
|
||||
//分配端口失败
|
||||
throw runtime_error("listen tcp socket[0] failed");
|
||||
}
|
||||
|
||||
if (!sock1->listen(2 * *sock_pair + 1, local_ip.data())) {
|
||||
//分配端口失败
|
||||
throw runtime_error("listen tcp socket[1] failed");
|
||||
}
|
||||
|
||||
auto on_cycle = [sock_pair](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {};
|
||||
// udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象
|
||||
sock0->setOnRead(on_cycle);
|
||||
sock1->setOnRead(on_cycle);
|
||||
}
|
||||
|
||||
if (!sock1->bindUdpSock(2 * *sock_pair + 1, local_ip.data(), re_use_port)) {
|
||||
//分配端口失败
|
||||
throw runtime_error("open udp socket[1] failed");
|
||||
}
|
||||
|
||||
auto on_cycle = [sock_pair](Socket::Ptr &, std::shared_ptr<void> &) {};
|
||||
// udp socket没onAccept事件,设置该回调,目的是为了在销毁socket时,回收对象
|
||||
sock0->setOnAccept(on_cycle);
|
||||
sock1->setOnAccept(on_cycle);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -372,7 +389,7 @@ private:
|
||||
_port_pair_pool.pop_front();
|
||||
InfoL << "got port from pool:" << 2 * pos << "-" << 2 * pos + 1;
|
||||
|
||||
weak_ptr<PortManager> weak_self = shared_from_this();
|
||||
weak_ptr<PortManager> weak_self = this->shared_from_this();
|
||||
std::shared_ptr<uint16_t> ret(new uint16_t(pos), [weak_self, pos](uint16_t *ptr) {
|
||||
delete ptr;
|
||||
auto strong_self = weak_self.lock();
|
||||
@ -392,20 +409,25 @@ private:
|
||||
deque<uint16_t> _port_pair_pool;
|
||||
};
|
||||
|
||||
void makeSockPair(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip, bool re_use_port) {
|
||||
void makeSockPair(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip, bool re_use_port, bool is_udp) {
|
||||
//全局互斥锁保护,防止端口重复分配
|
||||
static recursive_mutex s_mtx;
|
||||
lock_guard<recursive_mutex> lck(s_mtx);
|
||||
int try_count = 0;
|
||||
while (true) {
|
||||
try {
|
||||
PortManager::Instance().bindUdpSock(pair, local_ip, re_use_port);
|
||||
//udp和tcp端口池使用相同算法和范围分配,但是互不相干
|
||||
if (is_udp) {
|
||||
PortManager<0>::Instance().makeSockPair(pair, local_ip, re_use_port, is_udp);
|
||||
} else {
|
||||
PortManager<1>::Instance().makeSockPair(pair, local_ip, re_use_port, is_udp);
|
||||
}
|
||||
break;
|
||||
} catch (exception &ex) {
|
||||
if (++try_count == 3) {
|
||||
throw;
|
||||
}
|
||||
WarnL << "open udp socket failed:" << ex.what() << ", retry: " << try_count;
|
||||
WarnL << "open socket failed:" << ex.what() << ", retry: " << try_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ private:
|
||||
//创建rtp over tcp4个字节的头
|
||||
toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved);
|
||||
//创建rtp-rtcp端口对
|
||||
void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false);
|
||||
void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
|
||||
//十六进制方式打印ssrc
|
||||
std::string printSSRC(uint32_t ui32Ssrc);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user