From a106f8dfc00e04ff5f677fb62ee7be1e54ed65ac Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sun, 31 Dec 2023 22:10:13 +0800 Subject: [PATCH] Avoid blocking the poller thread --- tests/test_rtp.cpp | 139 ++++++++++++++++++++++----------------------- 1 file changed, 68 insertions(+), 71 deletions(-) diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 5bdffa12..9bb7e4bc 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -10,11 +10,8 @@ #include #include -#include "Util/MD5.h" -#include "Util/File.h" -#include "Util/logger.h" -#include "Util/SSLBox.h" #include "Util/util.h" +#include "Util/logger.h" #include "Network/TcpServer.h" #include "Common/config.h" #include "Rtsp/RtspSession.h" @@ -29,102 +26,102 @@ using namespace mediakit; static semaphore sem; #if defined(ENABLE_RTPPROXY) -static bool loadFile(const char *path, const EventPoller::Ptr &poller){ - FILE *fp = fopen(path, "rb"); +static bool loadFile(const char *path, const EventPoller::Ptr &poller) { + std::shared_ptr fp(fopen(path, "rb"), [](FILE *fp) { + sem.post(); + if (fp) { + fclose(fp); + } + }); if (!fp) { WarnL << "open file failed:" << path; return false; } - uint64_t timeStamp_last = 0; - uint16_t len; - char rtp[0xFFFF]; struct sockaddr_storage addr; memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; auto sock = Socket::createSocket(poller); - size_t total_size = 0; - RtpProcess::Ptr process; - uint32_t ssrc = 0; - while (true) { - if (2 != fread(&len, 1, 2, fp)) { - WarnL; - break; - } - len = ntohs(len); - if (len < 12 || len > sizeof(rtp)) { - WarnL << len; - break; - } + auto process = RtpSelector::Instance().getProcess("test", true); - if (len != fread(rtp, 1, len, fp)) { - WarnL; - break; - } - total_size += len; - uint64_t timeStamp = 0; - - if (!process) { - if (!RtpSelector::getSSRC(rtp, len, ssrc)) { - WarnL << "get ssrc from rtp failed:" << len; - return false; + uint64_t stamp_last = 0; + auto total_size = std::make_shared(0); + auto do_read = [fp, total_size, sock, addr, process, stamp_last]() mutable -> int { + uint16_t len; + char rtp[0xFFFF]; + while (true) { + if (2 != fread(&len, 1, 2, fp.get())) { + WarnL << "Read rtp size failed"; + // 重新播放 + fseek(fp.get(), 0, SEEK_SET); + return 1; } - process = RtpSelector::Instance().getProcess(printSSRC(ssrc), true); - } - if (process) { + len = ntohs(len); + if (len < 12 || len > sizeof(rtp)) { + WarnL << "Invalid rtp size: " << len; + return 0; + } + + if (len != fread(rtp, 1, len, fp.get())) { + WarnL << "Read rtp data failed"; + return 0; + } + (*total_size) += len; + uint64_t stamp = 0; try { - process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &timeStamp); - } catch (...) { - RtpSelector::Instance().delProcess(printSSRC(ssrc), process.get()); - throw; + process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &stamp); + } catch (std::exception &ex) { + WarnL << "Input rtp failed: " << ex.what(); + return 0; + } + + auto diff = stamp - stamp_last; + if (diff < 0 || diff > 500) { + diff = 1; + } + if (diff) { + stamp_last = stamp; + return diff; } } - - auto diff = timeStamp - timeStamp_last; - if (diff > 0 && diff < 500) { - usleep(diff * 1000); - } else { - usleep(1 * 1000); + }; + poller->doDelayTask(1, [do_read, total_size, process]() mutable { + auto ret = do_read(); + if (!ret) { + WarnL << *total_size / 1024 << "KB"; + RtpSelector::Instance().delProcess("test", process.get()); } - timeStamp_last = timeStamp; - } - WarnL << total_size / 1024 << "KB"; - fclose(fp); + return ret; + }); + return true; } -#endif//#if defined(ENABLE_RTPPROXY) +#endif // #if defined(ENABLE_RTPPROXY) -int main(int argc,char *argv[]) { - //设置日志 +int main(int argc, char *argv[]) { + // 设置日志 Logger::Instance().add(std::make_shared("ConsoleChannel")); #if defined(ENABLE_RTPPROXY) - //启动异步日志线程 + // 启动异步日志线程 Logger::Instance().setWriter(std::make_shared()); loadIniConfig((exeDir() + "config.ini").data()); TcpServer::Ptr rtspSrv(new TcpServer()); TcpServer::Ptr rtmpSrv(new TcpServer()); TcpServer::Ptr httpSrv(new TcpServer()); - rtspSrv->start(554);//默认554 - rtmpSrv->start(1935);//默认1935 - httpSrv->start(80);//默认80 - //此处选择是否导出调试文件 -// mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; + rtspSrv->start(554); // 默认554 + rtmpSrv->start(1935); // 默认1935 + httpSrv->start(80); // 默认80 + // 此处选择是否导出调试文件 + // mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; - if (argc == 2){ - auto poller = EventPollerPool::Instance().getPoller(); - poller->async_first([poller,argv](){ - loadFile(argv[1],poller); - sem.post(); - }); + if (argc == 2) { + loadFile(argv[1], EventPollerPool::Instance().getPoller()); sem.wait(); - sleep(1); + } else { + ErrorL << "parameter error."; } - else - ErrorL << "parameter error."; #else ErrorL << "please ENABLE_RTPPROXY and then test"; -#endif//#if defined(ENABLE_RTPPROXY) +#endif // #if defined(ENABLE_RTPPROXY) return 0; } - -