diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 80de4984..1980ee02 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -741,15 +741,12 @@ void installWebApi() { }); #if defined(ENABLE_RTPPROXY) - api_regist1("/index/api/getSsrcInfo",[](API_ARGS1){ + api_regist1("/index/api/getRtpInfo",[](API_ARGS1){ CHECK_SECRET(); - CHECK_ARGS("ssrc"); - uint32_t ssrc = 0; - stringstream ss(allArgs["ssrc"]); - ss >> std::hex >> ssrc; + CHECK_ARGS("stream_id"); - auto process = RtpSelector::Instance().getProcess(ssrc,false); - if(!process){ + auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + if (!process) { val["exist"] = false; return; } @@ -760,10 +757,10 @@ void installWebApi() { api_regist1("/index/api/openRtpServer",[](API_ARGS1){ CHECK_SECRET(); - CHECK_ARGS("port", "enable_tcp"); + CHECK_ARGS("port", "enable_tcp", "stream_id"); RtpServer::Ptr server = std::make_shared(); - server->start(allArgs["port"], allArgs["enable_tcp"].as()); + server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as()); val["port"] = server->getPort(); //保存对象 diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 08923216..b3f32d19 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -16,32 +16,21 @@ namespace mediakit{ -string printSSRC(uint32_t ui32Ssrc) { - char tmp[9] = { 0 }; - ui32Ssrc = htonl(ui32Ssrc); - uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; - for (int i = 0; i < 4; i++) { - sprintf(tmp + 2 * i, "%02X", pSsrc[i]); - } - return tmp; -} - static string printAddress(const struct sockaddr *addr){ return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); } -RtpProcess::RtpProcess(uint32_t ssrc) { - _ssrc = ssrc; +RtpProcess::RtpProcess(const string &stream_id) { _track = std::make_shared(); _track->_interleaved = 0; _track->_samplerate = 90000; _track->_type = TrackVideo; - _track->_ssrc = _ssrc; + _track->_ssrc = 0; _media_info._schema = RTP_APP_NAME; _media_info._vhost = DEFAULT_VHOST; _media_info._app = RTP_APP_NAME; - _media_info._streamid = printSSRC(_ssrc); + _media_info._streamid = stream_id; GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 7b054292..0c55e375 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -22,11 +22,10 @@ using namespace mediakit; namespace mediakit{ -string printSSRC(uint32_t ui32Ssrc); class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; - RtpProcess(uint32_t ssrc); + RtpProcess(const string &stream_id); ~RtpProcess(); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); @@ -54,7 +53,6 @@ private: std::shared_ptr _save_file_rtp; std::shared_ptr _save_file_ps; std::shared_ptr _save_file_video; - uint32_t _ssrc; SdpTrack::Ptr _track; struct sockaddr *_addr = nullptr; uint16_t _sequence = 0; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index d40e5ceb..36d8f8bc 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,37 +15,44 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { - uint32_t ssrc = 0; - if(!getSSRC(data,data_len,ssrc)){ - WarnL << "get ssrc from rtp failed:" << data_len; - return false; +bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + const struct sockaddr *addr,uint32_t *dts_out) { + if (stream_id.empty()) { + //未指定流id,那么使用ssrc为流id + uint32_t ssrc = 0; + if (!getSSRC(data, data_len, ssrc)) { + WarnL << "get ssrc from rtp failed:" << data_len; + return false; + } + stream_id = printSSRC(ssrc); } - auto process = getProcess(ssrc, true); - if(process){ - return process->inputRtp(sock, data,data_len, addr,dts_out); + + //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) + auto process = getProcess(stream_id, true); + if (process) { + return process->inputRtp(sock, data, data_len, addr, dts_out); } return false; } bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){ - if(data_len < 12){ + if (data_len < 12) { return false; } - uint32_t *ssrc_ptr = (uint32_t *)(data + 8); + uint32_t *ssrc_ptr = (uint32_t *) (data + 8); ssrc = ntohl(*ssrc_ptr); return true; } -RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) { +RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(ssrc); - if(it == _map_rtp_process.end() && !makeNew){ + auto it = _map_rtp_process.find(stream_id); + if (it == _map_rtp_process.end() && !makeNew) { return nullptr; } - RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc]; - if(!ref){ - ref = std::make_shared(ssrc,shared_from_this()); + RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; + if (!ref) { + ref = std::make_shared(stream_id, shared_from_this()); ref->attachEvent(); createTimer(); } @@ -67,17 +74,15 @@ void RtpSelector::createTimer() { } } -void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { +void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) { lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(ssrc); - if(it == _map_rtp_process.end()){ + auto it = _map_rtp_process.find(stream_id); + if (it == _map_rtp_process.end()) { return; } - - if(it->second->getProcess().get() != ptr){ + if (it->second->getProcess().get() != ptr) { return; } - _map_rtp_process.erase(it); } @@ -88,7 +93,7 @@ void RtpSelector::onManager() { ++it; continue; } - WarnL << "RtpProcess timeout:" << printSSRC(it->first); + WarnL << "RtpProcess timeout:" << it->first; it = _map_rtp_process.erase(it); } } @@ -99,10 +104,10 @@ RtpSelector::RtpSelector() { RtpSelector::~RtpSelector() { } -RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr &parent) { - _ssrc = ssrc; +RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { + _stream_id = stream_id; _parent = parent; - _process = std::make_shared(_ssrc); + _process = std::make_shared(stream_id); } RtpProcessHelper::~RtpProcessHelper() { @@ -114,14 +119,14 @@ void RtpProcessHelper::attachEvent() { bool RtpProcessHelper::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if(!_process || (!force && _process->totalReaderCount())){ + if (!_process || (!force && _process->totalReaderCount())) { return false; } auto parent = _parent.lock(); - if(!parent){ + if (!parent) { return false; } - parent->delProcess(_ssrc,_process.get()); + parent->delProcess(_stream_id, _process.get()); WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index d6354f48..adb6f1a5 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -24,19 +24,21 @@ class RtpSelector; class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; - RtpProcessHelper(uint32_t ssrc,const weak_ptr &parent); + RtpProcessHelper(const string &stream_id, const weak_ptr &parent); ~RtpProcessHelper(); void attachEvent(); RtpProcess::Ptr & getProcess(); + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; + private: weak_ptr _parent; RtpProcess::Ptr _process; - uint32_t _ssrc = 0; + string _stream_id; }; class RtpSelector : public std::enable_shared_from_this{ @@ -44,16 +46,21 @@ public: RtpSelector(); ~RtpSelector(); - static RtpSelector &Instance(); - bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); - RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); - void delProcess(uint32_t ssrc,const RtpProcess *ptr); + static RtpSelector &Instance(); + + bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + const struct sockaddr *addr, uint32_t *dts_out = nullptr); + + RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew); + void delProcess(const string &stream_id, const RtpProcess *ptr); + private: void onManager(); void createTimer(); + private: - unordered_map _map_rtp_process; + unordered_map _map_rtp_process; recursive_mutex _mtx_map; Timer::Ptr _timer; }; diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 5e90c1f7..466b6a0c 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -22,12 +22,12 @@ RtpServer::~RtpServer() { } } -void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_ip) { +void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { _udp_server.reset(new Socket(nullptr, false)); auto &ref = RtpSelector::Instance(); auto sock = _udp_server; - _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - ref.inputRtp(sock, buf->data(), buf->size(), addr); + _udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, const_cast(stream_id), buf->data(), buf->size(), addr); }); //创建udp服务器 @@ -43,6 +43,7 @@ void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_i try { //创建tcp服务器 _tcp_server = std::make_shared(_udp_server->getPoller()); + (*_tcp_server)[RtpSession::kStreamID] = stream_id; _tcp_server->start(_udp_server->get_local_port(), local_ip); } catch (...) { _tcp_server = nullptr; diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index c81f9513..7c406903 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -36,10 +36,11 @@ public: /** * 开启服务器,可能抛异常 * @param local_port 本地端口,0时为随机端口 + * @param stream_id 流id,置空则使用ssrc * @param enable_tcp 是否启用tcp服务器 * @param local_ip 绑定的本地网卡ip */ - void start(uint16_t local_port, bool enable_tcp = true, const char *local_ip = "0.0.0.0"); + void start(uint16_t local_port, const string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0"); /** * 获取绑定的本地端口 diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index b51bbda4..2e3f63fe 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -11,8 +11,15 @@ #if defined(ENABLE_RTPPROXY) #include "RtpSession.h" #include "RtpSelector.h" +#include "Network/TcpServer.h" namespace mediakit{ +const string RtpSession::kStreamID = "stream_id"; + +void RtpSession::attachServer(const TcpServer &server) { + _stream_id = const_cast(server)[kStreamID]; +} + RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); socklen_t addr_len = sizeof(addr); @@ -21,7 +28,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { RtpSession::~RtpSession() { DebugP(this); if(_process){ - RtpSelector::Instance().delProcess(_ssrc,_process.get()); + RtpSelector::Instance().delProcess(_stream_id,_process.get()); } } @@ -36,7 +43,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) { } void RtpSession::onError(const SockException &err) { - WarnL << _ssrc << " " << err.what(); + WarnL << _stream_id << " " << err.what(); } void RtpSession::onManager() { @@ -51,13 +58,19 @@ void RtpSession::onManager() { void RtpSession::onRtpPacket(const char *data, uint64_t len) { if (!_process) { - if (!RtpSelector::getSSRC(data + 2, len - 2, _ssrc)) { + uint32_t ssrc; + if (!RtpSelector::getSSRC(data + 2, len - 2, ssrc)) { return; } - _process = RtpSelector::Instance().getProcess(_ssrc, true); + if (_stream_id.empty()) { + //未指定流id就使用ssrc为流id + _stream_id = printSSRC(ssrc); + } + //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess + _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(_sock,data + 2, len - 2, &addr); + _process->inputRtp(_sock, data + 2, len - 2, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index a3557029..07c9166b 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -22,22 +22,26 @@ namespace mediakit{ class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{ public: + static const string kStreamID; RtpSession(const Socket::Ptr &sock); ~RtpSession() override; void onRecv(const Buffer::Ptr &) override; void onError(const SockException &err) override; void onManager() override; + void attachServer(const TcpServer &server) override; + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; void onRtpPacket(const char *data,uint64_t len) override; + private: - uint32_t _ssrc = 0; RtpProcess::Ptr _process; Ticker _ticker; struct sockaddr addr; + string _stream_id; }; }//namespace mediakit diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index d5d68511..7d6d0b42 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -38,6 +38,7 @@ static bool loadFile(const char *path){ uint16_t len; char rtp[2 * 1024]; struct sockaddr addr = {0}; + string stream_id; while (true) { if (2 != fread(&len, 1, 2, fp)) { WarnL; @@ -55,7 +56,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp); + RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0 && diff < 500){