diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 0b27c5e5..148a5992 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -400,7 +400,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mod //设置rtp超时移除事件 lock_guard lck(s_rtpServerMapMtx); s_rtpServerMap.erase(stream_id); - }); + }); //保存对象 s_rtpServerMap.emplace(stream_id, server); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index f79004e6..099bd876 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -66,18 +66,6 @@ RtpProcess::~RtpProcess() { } bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { - auto is_busy = _busy_flag.test_and_set(); - if (is_busy) { - //其他线程正在执行本函数 - WarnP(this) << "其他线程正在执行本函数"; - return false; - } - //没有其他线程执行本函数 - onceToken token(nullptr, [&]() { - //本函数执行完毕时,释放状态 - _busy_flag.clear(); - }); - if (!_sock) { //第一次运行本函数 _sock = sock; diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 2c27c143..3c3244a2 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -94,7 +94,6 @@ private: ProcessInterface::Ptr _process; MultiMediaSourceMuxer::Ptr _muxer; std::atomic_bool _stop_rtp_check{false}; - std::atomic_flag _busy_flag{false}; toolkit::Ticker _last_check_alive; std::recursive_mutex _func_mtx; std::deque > _cached_func; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index cf9ae26d..66d44828 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -25,25 +25,6 @@ void RtpSelector::clear(){ _map_rtp_process.clear(); } -bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, size_t data_len, const struct sockaddr *addr, - uint64_t *dts_out) { - uint32_t ssrc = 0; - if (!getSSRC(data, data_len, ssrc)) { - WarnL << "get ssrc from rtp failed:" << data_len; - return false; - } - auto process = getProcess(printSSRC(ssrc), true); - if (process) { - try { - return process->inputRtp(true, sock, data, data_len, addr, dts_out); - } catch (...) { - delProcess(printSSRC(ssrc), process.get()); - throw; - } - } - return false; -} - bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ if (data_len < 12) { return false; @@ -59,6 +40,10 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { if (it == _map_rtp_process.end() && !makeNew) { return nullptr; } + if (it != _map_rtp_process.end() && makeNew) { + //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 + throw std::runtime_error(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); + } RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; if (!ref) { ref = std::make_shared(stream_id, shared_from_this()); diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 746abacb..96888598 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -52,22 +52,10 @@ public: */ void clear(); - /** - * 输入多个rtp流,根据ssrc分流 - * @param sock 本地socket - * @param data 收到的数据 - * @param data_len 收到的数据长度 - * @param addr rtp流源地址 - * @param dts_out 解析出最新的dts - * @return 是否成功 - */ - bool inputRtp(const toolkit::Socket::Ptr &sock, const char *data, size_t data_len, - const struct sockaddr *addr, uint64_t *dts_out = nullptr); - /** * 获取一个rtp处理器 * @param stream_id 流id - * @param makeNew 不存在时是否新建 + * @param makeNew 不存在时是否新建, 该参数为true时,必须确保之前未创建同名对象 * @return rtp处理器 */ RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 441d1563..bafd793c 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -42,6 +42,8 @@ static bool loadFile(const char *path){ addr.ss_family = AF_INET; auto sock = Socket::createSocket(); size_t total_size = 0; + RtpProcess::Ptr process; + uint32_t ssrc = 0; while (true) { if (2 != fread(&len, 1, 2, fp)) { WarnL; @@ -58,9 +60,24 @@ static bool loadFile(const char *path){ break; } total_size += len; - uint64_t timeStamp; + uint64_t timeStamp = 0; + + if (!process) { + if (!RtpSelector::getSSRC(rtp, len, ssrc)) { + WarnL << "get ssrc from rtp failed:" << len; + return false; + } + process = RtpSelector::Instance().getProcess(printSSRC(ssrc), true); + } + if (process) { + try { + process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &timeStamp); + } catch (...) { + RtpSelector::Instance().delProcess(printSSRC(ssrc), process.get()); + throw; + } + } - RtpSelector::Instance().inputRtp(sock, rtp, len, (struct sockaddr *)&addr, &timeStamp); auto diff = timeStamp - timeStamp_last; if (diff > 0 && diff < 500) { usleep(diff * 1000);