重写rtp去冲突逻辑

This commit is contained in:
xia-chu 2021-02-04 18:20:59 +08:00
parent d6fc56d950
commit b0e1d5d6aa
6 changed files with 40 additions and 57 deletions

View File

@ -199,8 +199,6 @@ audioMtuSize=600
videoMtuSize=1400 videoMtuSize=1400
[rtp_proxy] [rtp_proxy]
#udp类型的代理服务器是否检查rtp源地址地址不配备将丢弃数据
checkSource=1
#导出调试数据(包括rtp/ps/h264)至该目录,置空则关闭数据导出 #导出调试数据(包括rtp/ps/h264)至该目录,置空则关闭数据导出
dumpDir= dumpDir=
#udp和tcp代理服务器支持rtp(必须是ts或ps类型)代理 #udp和tcp代理服务器支持rtp(必须是ts或ps类型)代理

View File

@ -281,14 +281,11 @@ namespace RtpProxy {
#define RTP_PROXY_FIELD "rtp_proxy." #define RTP_PROXY_FIELD "rtp_proxy."
//rtp调试数据保存目录 //rtp调试数据保存目录
const string kDumpDir = RTP_PROXY_FIELD"dumpDir"; const string kDumpDir = RTP_PROXY_FIELD"dumpDir";
//是否限制udp数据来源ip和端口
const string kCheckSource = RTP_PROXY_FIELD"checkSource";
//rtp接收超时时间 //rtp接收超时时间
const string kTimeoutSec = RTP_PROXY_FIELD"timeoutSec"; const string kTimeoutSec = RTP_PROXY_FIELD"timeoutSec";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kDumpDir] = ""; mINI::Instance()[kDumpDir] = "";
mINI::Instance()[kCheckSource] = 1;
mINI::Instance()[kTimeoutSec] = 15; mINI::Instance()[kTimeoutSec] = 15;
},nullptr); },nullptr);
} //namespace RtpProxy } //namespace RtpProxy

View File

@ -301,8 +301,6 @@ extern const string kBroadcastRecordTs;
namespace RtpProxy { namespace RtpProxy {
//rtp调试数据保存目录,置空则不生成 //rtp调试数据保存目录,置空则不生成
extern const string kDumpDir; extern const string kDumpDir;
//是否限制udp数据来源ip和端口
extern const string kCheckSource;
//rtp接收超时时间 //rtp接收超时时间
extern const string kTimeoutSec; extern const string kTimeoutSec;
} //namespace RtpProxy } //namespace RtpProxy

View File

@ -11,18 +11,12 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "GB28181Process.h" #include "GB28181Process.h"
#include "RtpProcess.h" #include "RtpProcess.h"
#include "RtpSplitter.h"
#include "Util/File.h"
#include "Http/HttpTSPlayer.h" #include "Http/HttpTSPlayer.h"
#define RTP_APP_NAME "rtp" #define RTP_APP_NAME "rtp"
namespace mediakit { namespace mediakit {
static string printAddress(const struct sockaddr *addr) {
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
}
RtpProcess::RtpProcess(const string &stream_id) { RtpProcess::RtpProcess(const string &stream_id) {
_media_info._schema = RTP_APP_NAME; _media_info._schema = RTP_APP_NAME;
_media_info._vhost = DEFAULT_VHOST; _media_info._vhost = DEFAULT_VHOST;
@ -63,23 +57,17 @@ RtpProcess::~RtpProcess() {
if (_total_bytes >= iFlowThreshold * 1024) { if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
} }
if (_addr) {
delete _addr;
_addr = nullptr;
}
} }
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint32_t *dts_out) { bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint32_t *dts_out) {
GET_CONFIG(bool, check_source, RtpProxy::kCheckSource); if (!_sock) {
//检查源是否合法
if (!_addr) {
_addr = new struct sockaddr;
_sock = sock; _sock = sock;
memcpy(_addr, addr, sizeof(struct sockaddr)); _addr = *addr;
DebugP(this) << "bind to address:" << printAddress(_addr);
//推流鉴权
emitOnPublish(); emitOnPublish();
} else if (!_sock->getPoller()->isCurrentThread()) {
//其他线程执行本对象,存在线程安全问题
WarnP(this) << "其他线程执行本对象";
return false;
} }
if (!_muxer) { if (!_muxer) {
@ -87,11 +75,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
return false; return false;
} }
if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) {
DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
return false;
}
_total_bytes += len; _total_bytes += len;
if (_save_file_rtp) { if (_save_file_rtp) {
uint16_t size = (uint16_t)len; uint16_t size = (uint16_t)len;
@ -161,17 +144,11 @@ void RtpProcess::setOnDetach(const function<void()> &cb) {
} }
string RtpProcess::get_peer_ip() { string RtpProcess::get_peer_ip() {
if (_addr) { return SockUtil::inet_ntoa(((struct sockaddr_in &) _addr).sin_addr);
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
}
return "0.0.0.0";
} }
uint16_t RtpProcess::get_peer_port() { uint16_t RtpProcess::get_peer_port() {
if (!_addr) { return ntohs(((struct sockaddr_in &) _addr).sin_port);
return 0;
}
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
} }
string RtpProcess::get_local_ip() { string RtpProcess::get_local_ip() {

View File

@ -83,7 +83,7 @@ private:
private: private:
uint32_t _dts = 0; uint32_t _dts = 0;
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;
struct sockaddr *_addr = nullptr; struct sockaddr _addr{0};
Socket::Ptr _sock; Socket::Ptr _sock;
MediaInfo _media_info; MediaInfo _media_info;
Ticker _last_frame_time; Ticker _last_frame_time;

View File

@ -28,43 +28,55 @@ using namespace mediakit;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
static bool loadFile(const char *path){ static bool loadFile(const char *path){
FILE *fp = fopen(path, "rb"); std::shared_ptr<FILE> fp(fopen(path, "rb"), [](FILE *fp) {
if (fp) {
fclose(fp);
}
});
if (!fp) { if (!fp) {
WarnL << "open file failed:" << path; WarnL << "open file failed:" << path;
return false; return false;
} }
semaphore sem;
uint16_t len = 0;
uint32_t timeStamp_last = 0; uint32_t timeStamp_last = 0;
uint16_t len;
char rtp[2 * 1024]; char rtp[2 * 1024];
struct sockaddr addr = {0}; struct sockaddr addr = {0};
while (true) { auto sock = Socket::createSocket();
if (2 != fread(&len, 1, 2, fp)) {
sock->getPoller()->doDelayTask(0, [&]() mutable -> uint64_t {
if (2 != fread(&len, 1, 2, fp.get())) {
WarnL; WarnL;
break; sem.post();
return 0;
} }
len = ntohs(len); len = ntohs(len);
if (len < 12 || len > sizeof(rtp)) { if (len < 12 || len > sizeof(rtp)) {
WarnL << len; WarnL << len;
break; sem.post();
return 0;
} }
if (len != fread(rtp, 1, len, fp)) { if (len != fread(rtp, 1, len, fp.get())) {
WarnL; WarnL;
break; sem.post();
return 0;
} }
uint32_t timeStamp; uint32_t timeStamp;
RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp); RtpSelector::Instance().inputRtp(sock, rtp, len, &addr, &timeStamp);
if (timeStamp_last) { if (timeStamp_last) {
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if (diff > 0 && diff < 500) { if (diff > 0 && diff < 500) {
usleep(diff * 1000); timeStamp_last = timeStamp;
return diff;
} }
} }
timeStamp_last = timeStamp; timeStamp_last = timeStamp;
} return 1;
fclose(fp); });
sem.wait();
return true; return true;
} }
#endif//#if defined(ENABLE_RTPPROXY) #endif//#if defined(ENABLE_RTPPROXY)
@ -85,10 +97,11 @@ int main(int argc,char *argv[]) {
//此处选择是否导出调试文件 //此处选择是否导出调试文件
// mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; // mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/";
if (argc == 2) if (argc == 2) {
loadFile(argv[1]); loadFile(argv[1]);
else } else {
ErrorL << "parameter error."; ErrorL << "parameter error.";
}
#else #else
ErrorL << "please ENABLE_RTPPROXY and then test"; ErrorL << "please ENABLE_RTPPROXY and then test";
#endif//#if defined(ENABLE_RTPPROXY) #endif//#if defined(ENABLE_RTPPROXY)