diff --git a/conf/config.ini b/conf/config.ini index 7c100a9e..59a7f06b 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -199,8 +199,6 @@ audioMtuSize=600 videoMtuSize=1400 [rtp_proxy] -#udp类型的代理服务器是否检查rtp源地址,地址不配备将丢弃数据 -checkSource=1 #导出调试数据(包括rtp/ps/h264)至该目录,置空则关闭数据导出 dumpDir= #udp和tcp代理服务器,支持rtp(必须是ts或ps类型)代理 diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 996f4f6a..41c59083 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -281,14 +281,11 @@ namespace RtpProxy { #define RTP_PROXY_FIELD "rtp_proxy." //rtp调试数据保存目录 const string kDumpDir = RTP_PROXY_FIELD"dumpDir"; -//是否限制udp数据来源ip和端口 -const string kCheckSource = RTP_PROXY_FIELD"checkSource"; //rtp接收超时时间 const string kTimeoutSec = RTP_PROXY_FIELD"timeoutSec"; onceToken token([](){ mINI::Instance()[kDumpDir] = ""; - mINI::Instance()[kCheckSource] = 1; mINI::Instance()[kTimeoutSec] = 15; },nullptr); } //namespace RtpProxy diff --git a/src/Common/config.h b/src/Common/config.h index 7b67438b..6b26ab3c 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -301,8 +301,6 @@ extern const string kBroadcastRecordTs; namespace RtpProxy { //rtp调试数据保存目录,置空则不生成 extern const string kDumpDir; -//是否限制udp数据来源ip和端口 -extern const string kCheckSource; //rtp接收超时时间 extern const string kTimeoutSec; } //namespace RtpProxy diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index f314bd47..64a15a3a 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -11,18 +11,12 @@ #if defined(ENABLE_RTPPROXY) #include "GB28181Process.h" #include "RtpProcess.h" -#include "RtpSplitter.h" -#include "Util/File.h" #include "Http/HttpTSPlayer.h" #define RTP_APP_NAME "rtp" namespace mediakit { -static string printAddress(const struct sockaddr *addr) { - return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); -} - RtpProcess::RtpProcess(const string &stream_id) { _media_info._schema = RTP_APP_NAME; _media_info._vhost = DEFAULT_VHOST; @@ -63,23 +57,17 @@ RtpProcess::~RtpProcess() { if (_total_bytes >= iFlowThreshold * 1024) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast(*this)); } - - if (_addr) { - delete _addr; - _addr = nullptr; - } } bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint32_t *dts_out) { - GET_CONFIG(bool, check_source, RtpProxy::kCheckSource); - //检查源是否合法 - if (!_addr) { - _addr = new struct sockaddr; + if (!_sock) { _sock = sock; - memcpy(_addr, addr, sizeof(struct sockaddr)); - DebugP(this) << "bind to address:" << printAddress(_addr); - //推流鉴权 + _addr = *addr; emitOnPublish(); + } else if (!_sock->getPoller()->isCurrentThread()) { + //其他线程执行本对象,存在线程安全问题 + WarnP(this) << "其他线程执行本对象"; + return false; } if (!_muxer) { @@ -87,11 +75,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data return false; } - if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) { - DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); - return false; - } - _total_bytes += len; if (_save_file_rtp) { uint16_t size = (uint16_t)len; @@ -161,17 +144,11 @@ void RtpProcess::setOnDetach(const function &cb) { } string RtpProcess::get_peer_ip() { - if (_addr) { - return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); - } - return "0.0.0.0"; + return SockUtil::inet_ntoa(((struct sockaddr_in &) _addr).sin_addr); } uint16_t RtpProcess::get_peer_port() { - if (!_addr) { - return 0; - } - return ntohs(((struct sockaddr_in *) _addr)->sin_port); + return ntohs(((struct sockaddr_in &) _addr).sin_port); } string RtpProcess::get_local_ip() { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 3b78f35f..ffb9ace4 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -83,7 +83,7 @@ private: private: uint32_t _dts = 0; uint64_t _total_bytes = 0; - struct sockaddr *_addr = nullptr; + struct sockaddr _addr{0}; Socket::Ptr _sock; MediaInfo _media_info; Ticker _last_frame_time; diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 2c4b521e..2cb2f593 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -28,43 +28,55 @@ using namespace mediakit; #if defined(ENABLE_RTPPROXY) static bool loadFile(const char *path){ - FILE *fp = fopen(path, "rb"); + std::shared_ptr fp(fopen(path, "rb"), [](FILE *fp) { + if (fp) { + fclose(fp); + } + }); if (!fp) { WarnL << "open file failed:" << path; return false; } + semaphore sem; + uint16_t len = 0; uint32_t timeStamp_last = 0; - uint16_t len; char rtp[2 * 1024]; struct sockaddr addr = {0}; - while (true) { - if (2 != fread(&len, 1, 2, fp)) { + auto sock = Socket::createSocket(); + + sock->getPoller()->doDelayTask(0, [&]() mutable -> uint64_t { + if (2 != fread(&len, 1, 2, fp.get())) { WarnL; - break; + sem.post(); + return 0; } len = ntohs(len); if (len < 12 || len > sizeof(rtp)) { WarnL << len; - break; + sem.post(); + return 0; } - if (len != fread(rtp, 1, len, fp)) { + if (len != fread(rtp, 1, len, fp.get())) { WarnL; - break; + sem.post(); + return 0; } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp); - if(timeStamp_last){ + RtpSelector::Instance().inputRtp(sock, rtp, len, &addr, &timeStamp); + if (timeStamp_last) { auto diff = timeStamp - timeStamp_last; - if(diff > 0 && diff < 500){ - usleep(diff * 1000); + if (diff > 0 && diff < 500) { + timeStamp_last = timeStamp; + return diff; } } timeStamp_last = timeStamp; - } - fclose(fp); + return 1; + }); + sem.wait(); return true; } #endif//#if defined(ENABLE_RTPPROXY) @@ -85,10 +97,11 @@ int main(int argc,char *argv[]) { //此处选择是否导出调试文件 // mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; - if (argc == 2) - loadFile(argv[1]); - else - ErrorL << "parameter error."; + if (argc == 2) { + loadFile(argv[1]); + } else { + ErrorL << "parameter error."; + } #else ErrorL << "please ENABLE_RTPPROXY and then test"; #endif//#if defined(ENABLE_RTPPROXY)