openRtpServer接口增加单视频参数,加快单视频流注册速度 (#3342)

only_audio -> only_track
This commit is contained in:
waken 2024-03-05 17:06:31 +08:00 committed by GitHub
parent ffdc13bfb9
commit 79b2aa6adc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 47 additions and 34 deletions

View File

@ -1,11 +1,11 @@
{ {
"info": { "info": {
"_postman_id": "08e3bc35-5318-4949-81bb-90d854706194", "_postman_id": "08c77fc3-7670-428c-bde4-80c8cc9f389f",
"name": "ZLMediaKit", "name": "ZLMediaKit",
"description": "媒体服务器", "description": "媒体服务器",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "29185956", "_exporter_id": "29185956",
"_collection_link": "https://lively-station-598157.postman.co/workspace/%E6%B5%81%E5%AA%92%E4%BD%93%E6%9C%8D%E5%8A%A1~1e119172-45b0-4ed6-b1fc-8a15d0e2d5f8/collection/29185956-08e3bc35-5318-4949-81bb-90d854706194?action=share&source=collection_link&creator=29185956" "_collection_link": "https://lively-station-598157.postman.co/workspace/%E6%B5%81%E5%AA%92%E4%BD%93%E6%9C%8D%E5%8A%A1~1e119172-45b0-4ed6-b1fc-8a15d0e2d5f8/collection/29185956-08c77fc3-7670-428c-bde4-80c8cc9f389f?action=share&source=collection_link&creator=29185956"
}, },
"item": [ "item": [
{ {
@ -1470,9 +1470,9 @@
"disabled": true "disabled": true
}, },
{ {
"key": "only_audio", "key": "only_track",
"value": "1", "value": "1",
"description": "是否为单音频track用于语音对讲", "description": "是否为单音频/单视频track0不设置1单音频2单视频",
"disabled": true "disabled": true
}, },
{ {
@ -1523,9 +1523,9 @@
"description": "该端口绑定的流id\n" "description": "该端口绑定的流id\n"
}, },
{ {
"key": "only_audio", "key": "only_track",
"value": "0", "value": "0",
"description": "是否为单音频track用于语音对讲", "description": "是否为单音频/单视频track0不设置1单音频2单视频",
"disabled": true "disabled": true
}, },
{ {

View File

@ -415,7 +415,7 @@ Value makeMediaSourceJson(MediaSource &media){
} }
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, bool only_audio, bool multiplex) { uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) { if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) {
//为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id //为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id
@ -423,7 +423,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mod
} }
RtpServer::Ptr server = std::make_shared<RtpServer>(); RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_audio, multiplex); server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex);
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id]() {
//设置rtp超时移除事件 //设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
@ -1198,12 +1198,17 @@ void installWebApi() {
//兼容老版本请求新版本去除enable_tcp参数并新增tcp_mode参数 //兼容老版本请求新版本去除enable_tcp参数并新增tcp_mode参数
tcp_mode = 1; tcp_mode = 1;
} }
auto only_track = allArgs["only_track"].as<int>();
if (allArgs["only_audio"].as<bool>()) {
// 兼容老版本请求新版本去除only_audio参数并新增only_track参数
only_track = 1;
}
std::string local_ip = "::"; std::string local_ip = "::";
if (!allArgs["local_ip"].empty()) { if (!allArgs["local_ip"].empty()) {
local_ip = allArgs["local_ip"]; local_ip = allArgs["local_ip"];
} }
auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as<bool>(), auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as<bool>(),
allArgs["ssrc"].as<uint32_t>(), allArgs["only_audio"].as<bool>()); allArgs["ssrc"].as<uint32_t>(), only_track);
if (port == 0) { if (port == 0) {
throw InvalidArgsException("该stream_id已存在"); throw InvalidArgsException("该stream_id已存在");
} }
@ -1220,11 +1225,16 @@ void installWebApi() {
// 兼容老版本请求新版本去除enable_tcp参数并新增tcp_mode参数 // 兼容老版本请求新版本去除enable_tcp参数并新增tcp_mode参数
tcp_mode = 1; tcp_mode = 1;
} }
auto only_track = allArgs["only_track"].as<int>();
if (allArgs["only_audio"].as<bool>()) {
// 兼容老版本请求新版本去除only_audio参数并新增only_track参数
only_track = 1;
}
std::string local_ip = "::"; std::string local_ip = "::";
if (!allArgs["local_ip"].empty()) { if (!allArgs["local_ip"].empty()) {
local_ip = allArgs["local_ip"]; local_ip = allArgs["local_ip"];
} }
auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, true, 0, allArgs["only_audio"].as<bool>(),true); auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, true, 0, only_track,true);
if (port == 0) { if (port == 0) {
throw InvalidArgsException("该stream_id已存在"); throw InvalidArgsException("该stream_id已存在");
} }

View File

@ -233,7 +233,7 @@ void installWebApi();
void unInstallWebApi(); void unInstallWebApi();
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, bool only_audio, bool multiplex=false); uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex=false);
void connectRtpServer(const std::string &stream_id, const std::string &dst_url, uint16_t dst_port, const std::function<void(const toolkit::SockException &ex)> &cb); void connectRtpServer(const std::string &stream_id, const std::string &dst_url, uint16_t dst_port, const std::function<void(const toolkit::SockException &ex)> &cb);
bool closeRtpServer(const std::string &stream_id); bool closeRtpServer(const std::string &stream_id);
#endif #endif

View File

@ -133,7 +133,7 @@ void MediaSink::checkTrackIfReady() {
} }
GET_CONFIG(uint32_t, kMaxAddTrackMS, General::kWaitAddTrackMS); GET_CONFIG(uint32_t, kMaxAddTrackMS, General::kWaitAddTrackMS);
if (_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS) { if (_track_map.size() == 1 && (_ticker.elapsedTime() > kMaxAddTrackMS || !_enable_audio)) {
// 如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track) // 如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady(); emitAllTrackReady();
return; return;

View File

@ -199,8 +199,8 @@ void RtpProcess::setStopCheckRtp(bool is_check){
} }
} }
void RtpProcess::setOnlyAudio(bool only_audio){ void RtpProcess::setOnlyTrack(OnlyTrack only_track) {
_only_audio = only_audio; _only_track = only_track;
} }
void RtpProcess::onDetach() { void RtpProcess::onDetach() {
@ -259,8 +259,10 @@ void RtpProcess::emitOnPublish() {
if (!option.stream_replace.empty()) { if (!option.stream_replace.empty()) {
RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace); RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace);
} }
if (strong_self->_only_audio) { switch (strong_self->_only_track) {
strong_self->_muxer->setOnlyAudio(); case kOnlyAudio: strong_self->_muxer->setOnlyAudio(); break;
case kOnlyVideo: strong_self->_muxer->enableAudio(false); break;
default: break;
} }
strong_self->_muxer->setMediaListener(strong_self); strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc(); strong_self->doCachedFunc();

View File

@ -24,6 +24,7 @@ public:
friend class RtpProcessHelper; friend class RtpProcessHelper;
RtpProcess(const std::string &stream_id); RtpProcess(const std::string &stream_id);
~RtpProcess(); ~RtpProcess();
enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 };
/** /**
* rtp * rtp
@ -58,10 +59,10 @@ public:
void setStopCheckRtp(bool is_check=false); void setStopCheckRtp(bool is_check=false);
/** /**
* track * track/
* inputRtp前调用此方法 * inputRtp前调用此方法
*/ */
void setOnlyAudio(bool only_audio); void setOnlyTrack(OnlyTrack only_track);
/** /**
* flush输出缓存 * flush输出缓存
@ -93,7 +94,7 @@ private:
void doCachedFunc(); void doCachedFunc();
private: private:
bool _only_audio = false; OnlyTrack _only_track = kAll;
std::string _auth_err; std::string _auth_err;
uint64_t _dts = 0; uint64_t _dts = 0;
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;

View File

@ -42,12 +42,12 @@ public:
} }
} }
void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc, bool only_audio) { void setRtpServerInfo(uint16_t local_port, RtpServer::TcpMode mode, bool re_use_port, uint32_t ssrc, int only_track) {
_local_port = local_port; _local_port = local_port;
_tcp_mode = mode; _tcp_mode = mode;
_re_use_port = re_use_port; _re_use_port = re_use_port;
_ssrc = ssrc; _ssrc = ssrc;
_only_audio = only_audio; _only_track = only_track;
} }
void setOnDetach(function<void()> cb) { void setOnDetach(function<void()> cb) {
@ -61,7 +61,7 @@ public:
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) { if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnlyAudio(_only_audio); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setOnDetach(std::move(_on_detach)); _process->setOnDetach(std::move(_on_detach));
cancelDelayTask(); cancelDelayTask();
} }
@ -142,7 +142,7 @@ private:
private: private:
bool _re_use_port = false; bool _re_use_port = false;
bool _only_audio = false; int _only_track = 0;
uint16_t _local_port = 0; uint16_t _local_port = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
RtpServer::TcpMode _tcp_mode = RtpServer::NONE; RtpServer::TcpMode _tcp_mode = RtpServer::NONE;
@ -156,7 +156,7 @@ private:
EventPoller::DelayTask::Ptr _delay_task; EventPoller::DelayTask::Ptr _delay_task;
}; };
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, bool only_audio, bool multiplex) { void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
//创建udp服务器 //创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
@ -184,7 +184,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller()); tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kStreamID] = stream_id;
(*tcp_server)[RtpSession::kSSRC] = ssrc; (*tcp_server)[RtpSession::kSSRC] = ssrc;
(*tcp_server)[RtpSession::kOnlyAudio] = only_audio; (*tcp_server)[RtpSession::kOnlyTrack] = only_track;
if (tcp_mode == PASSIVE) { if (tcp_mode == PASSIVE) {
tcp_server->start<RtpSession>(local_port, local_ip); tcp_server->start<RtpSession>(local_port, local_ip);
} else if (stream_id.empty()) { } else if (stream_id.empty()) {
@ -201,7 +201,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id); helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
helper->startRtcp(); helper->startRtcp();
helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_audio); helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track);
bool bind_peer_addr = false; bool bind_peer_addr = false;
auto ssrc_ptr = std::make_shared<uint32_t>(ssrc); auto ssrc_ptr = std::make_shared<uint32_t>(ssrc);
_ssrc = ssrc_ptr; _ssrc = ssrc_ptr;
@ -223,7 +223,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
} else { } else {
//单端口多线程接收多个流根据ssrc区分流 //单端口多线程接收多个流根据ssrc区分流
udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller()); udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller());
(*udp_server)[RtpSession::kOnlyAudio] = only_audio; (*udp_server)[RtpSession::kOnlyTrack] = only_track;
(*udp_server)[RtpSession::kUdpRecvBuffer] = udpRecvSocketBuffer; (*udp_server)[RtpSession::kUdpRecvBuffer] = udpRecvSocketBuffer;
udp_server->start<RtpSession>(local_port, local_ip); udp_server->start<RtpSession>(local_port, local_ip);
rtp_socket = nullptr; rtp_socket = nullptr;

View File

@ -44,7 +44,7 @@ public:
* @param multiplex * @param multiplex
*/ */
void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, bool only_audio = false, bool multiplex = false); const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false);
/** /**
* tcp服务(tcp主动模式) * tcp服务(tcp主动模式)
@ -81,7 +81,7 @@ protected:
std::shared_ptr<RtcpHelper> _rtcp_helper; std::shared_ptr<RtcpHelper> _rtcp_helper;
std::function<void()> _on_cleanup; std::function<void()> _on_cleanup;
bool _only_audio = false; int _only_track = 0;
//用于tcp主动模式 //用于tcp主动模式
TcpMode _tcp_mode = NONE; TcpMode _tcp_mode = NONE;
}; };

View File

@ -23,7 +23,7 @@ namespace mediakit{
const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kSSRC = "ssrc"; const string RtpSession::kSSRC = "ssrc";
const string RtpSession::kOnlyAudio = "only_audio"; const string RtpSession::kOnlyTrack = "only_track";
const string RtpSession::kUdpRecvBuffer = "udp_recv_socket_buffer"; const string RtpSession::kUdpRecvBuffer = "udp_recv_socket_buffer";
void RtpSession::attachServer(const Server &server) { void RtpSession::attachServer(const Server &server) {
@ -33,7 +33,7 @@ void RtpSession::attachServer(const Server &server) {
void RtpSession::setParams(mINI &ini) { void RtpSession::setParams(mINI &ini) {
_stream_id = ini[kStreamID]; _stream_id = ini[kStreamID];
_ssrc = ini[kSSRC]; _ssrc = ini[kSSRC];
_only_audio = ini[kOnlyAudio]; _only_track = ini[kOnlyTrack];
int udp_socket_buffer = ini[kUdpRecvBuffer]; int udp_socket_buffer = ini[kUdpRecvBuffer];
if (_is_udp) { if (_is_udp) {
// 设置udp socket读缓存 // 设置udp socket读缓存
@ -125,7 +125,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_delay_close = true; _delay_close = true;
return; return;
} }
_process->setOnlyAudio(_only_audio); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setDelegate(static_pointer_cast<RtpSession>(shared_from_this())); _process->setDelegate(static_pointer_cast<RtpSession>(shared_from_this()));
} }
try { try {

View File

@ -24,7 +24,7 @@ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSour
public: public:
static const std::string kStreamID; static const std::string kStreamID;
static const std::string kSSRC; static const std::string kSSRC;
static const std::string kOnlyAudio; static const std::string kOnlyTrack;
static const std::string kUdpRecvBuffer; static const std::string kUdpRecvBuffer;
RtpSession(const toolkit::Socket::Ptr &sock); RtpSession(const toolkit::Socket::Ptr &sock);
@ -52,7 +52,7 @@ private:
bool _is_udp = false; bool _is_udp = false;
bool _search_rtp = false; bool _search_rtp = false;
bool _search_rtp_finished = false; bool _search_rtp_finished = false;
bool _only_audio = false; int _only_track = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
toolkit::Ticker _ticker; toolkit::Ticker _ticker;
std::string _stream_id; std::string _stream_id;