diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 64a15a3a..1559628e 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -22,7 +22,6 @@ RtpProcess::RtpProcess(const string &stream_id) { _media_info._vhost = DEFAULT_VHOST; _media_info._app = RTP_APP_NAME; _media_info._streamid = stream_id; - _stop_rtp_check.store(false); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); { @@ -60,14 +59,23 @@ RtpProcess::~RtpProcess() { } bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint32_t *dts_out) { + auto is_busy = _busy_flag.test_and_set(); + if (is_busy) { + //其他线程正在执行本函数 + WarnP(this) << "其他线程正在执行本函数"; + return false; + } + //没有其他线程执行本函数 + onceToken token(nullptr, [&]() { + //本函数执行完毕时,释放状态 + _busy_flag.clear(); + }); + if (!_sock) { + //第一次运行本函数 _sock = sock; _addr = *addr; emitOnPublish(); - } else if (!_sock->getPoller()->isCurrentThread()) { - //其他线程执行本对象,存在线程安全问题 - WarnP(this) << "其他线程执行本对象"; - return false; } if (!_muxer) { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index ffb9ace4..d8084d73 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -92,7 +92,8 @@ private: std::shared_ptr _save_file_video; ProcessInterface::Ptr _process; MultiMediaSourceMuxer::Ptr _muxer; - std::atomic_bool _stop_rtp_check; + atomic_bool _stop_rtp_check{false}; + atomic_flag _busy_flag{false}; }; }//namespace mediakit