支持日志上下文

This commit is contained in:
xiongziliang 2023-11-20 21:59:43 +08:00
parent a13b8417e5
commit 8bd7157ca1
14 changed files with 32 additions and 20 deletions

@ -1 +1 @@
Subproject commit ad44a16c99834540b397774ad6c7f3f8ed619d56 Subproject commit 83fc31c8d80499422378403e49cde35a53fda14b

View File

@ -448,6 +448,7 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &s
} }
poller->async([weak_session, cancel_all, info, cb_once]() { poller->async([weak_session, cancel_all, info, cb_once]() {
Logger::setThreadContext(weak_session);
cancel_all(); cancel_all();
if (auto strong_session = weak_session.lock()) { if (auto strong_session = weak_session.lock()) {
//播发器请求的流终于注册上了,切换到自己的线程再回复 //播发器请求的流终于注册上了,切换到自己的线程再回复

View File

@ -196,7 +196,7 @@ bool HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map<int, ts
_timer.reset(); _timer.reset();
weak_ptr<HlsPlayer> weak_self = static_pointer_cast<HlsPlayer>(shared_from_this()); weak_ptr<HlsPlayer> weak_self = static_pointer_cast<HlsPlayer>(shared_from_this());
auto url = ts_map.rbegin()->second.url; auto url = ts_map.rbegin()->second.url;
getPoller()->async([weak_self, url]() { async([weak_self, url]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
strong_self->play(url); strong_self->play(url);

View File

@ -253,6 +253,7 @@ void RtpProcess::emitOnPublish() {
if (!strong_self) { if (!strong_self) {
return; return;
} }
Logger::setThreadContext(weak_self);
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f,
option); option);

View File

@ -26,7 +26,7 @@ RtpServer::~RtpServer() {
} }
} }
class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> { class RtcpHelper: public std::enable_shared_from_this<RtcpHelper>, public toolkit::LogThreadContext {
public: public:
using Ptr = std::shared_ptr<RtcpHelper>; using Ptr = std::shared_ptr<RtcpHelper>;
@ -35,13 +35,15 @@ public:
_stream_id = std::move(stream_id); _stream_id = std::move(stream_id);
} }
~RtcpHelper() { ~RtcpHelper() override {
if (_process) { if (_process) {
// 删除rtp处理器 // 删除rtp处理器
RtpSelector::Instance().delProcess(_stream_id, _process.get()); RtpSelector::Instance().delProcess(_stream_id, _process.get());
} }
} }
std::string getIdentifier() const override { return _process ? _process->getIdentifier() : ""; }
void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc, bool only_audio) { void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc, bool only_audio) {
_local_port = local_port; _local_port = local_port;
_tcp_mode = mode; _tcp_mode = mode;
@ -80,6 +82,7 @@ public:
if (!strong_self || !strong_self->_process) { if (!strong_self || !strong_self->_process) {
return; return;
} }
Logger::setThreadContext(weak_self);
if (!strong_self->_rtcp_addr) { if (!strong_self->_rtcp_addr) {
// 只设置一次rtcp对端端口 // 只设置一次rtcp对端端口
strong_self->_rtcp_addr = std::make_shared<struct sockaddr_storage>(); strong_self->_rtcp_addr = std::make_shared<struct sockaddr_storage>();
@ -96,6 +99,7 @@ public:
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec);
_delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() {
if (auto strong_self = weak_self.lock()) { if (auto strong_self = weak_self.lock()) {
Logger::setThreadContext(weak_self);
auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false);
if (!process && strong_self->_on_detach) { if (!process && strong_self->_on_detach) {
strong_self->_on_detach(); strong_self->_on_detach();
@ -205,6 +209,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
auto ssrc_ptr = std::make_shared<uint32_t>(ssrc); auto ssrc_ptr = std::make_shared<uint32_t>(ssrc);
_ssrc = ssrc_ptr; _ssrc = ssrc_ptr;
rtp_socket->setOnRead([rtp_socket, helper, ssrc_ptr, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { rtp_socket->setOnRead([rtp_socket, helper, ssrc_ptr, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
Logger::setThreadContext(helper);
RtpHeader *header = (RtpHeader *)buf->data(); RtpHeader *header = (RtpHeader *)buf->data();
auto rtp_ssrc = ntohl(header->ssrc); auto rtp_ssrc = ntohl(header->ssrc);
auto ssrc = *ssrc_ptr; auto ssrc = *ssrc_ptr;
@ -277,6 +282,7 @@ void RtpServer::onConnect() {
auto rtp_session = std::make_shared<RtpSession>(_rtp_socket); auto rtp_session = std::make_shared<RtpSession>(_rtp_socket);
rtp_session->attachServer(*_tcp_server); rtp_session->attachServer(*_tcp_server);
_rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { _rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
Logger::setThreadContext(rtp_session);
rtp_session->onRecv(buf); rtp_session->onRecv(buf);
}); });
weak_ptr<RtpServer> weak_self = shared_from_this(); weak_ptr<RtpServer> weak_self = shared_from_this();

View File

@ -352,6 +352,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
Logger::setThreadContext(weakSelf);
if (SockUtil::inet_ntoa(addr) != peer_ip) { if (SockUtil::inet_ntoa(addr) != peer_ip) {
WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(addr); WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(addr);
return; return;
@ -367,6 +368,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
Logger::setThreadContext(weakSelf);
if (SockUtil::inet_ntoa(addr) != peer_ip) { if (SockUtil::inet_ntoa(addr) != peer_ip) {
WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr); WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr);
return; return;

View File

@ -328,6 +328,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int track_idx) {
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
Logger::setThreadContext(weakSelf);
if (SockUtil::inet_ntoa(addr) != peer_ip) { if (SockUtil::inet_ntoa(addr) != peer_ip) {
WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr); WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr);
return; return;

View File

@ -1009,7 +1009,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) {
if (!strong_self) { if (!strong_self) {
return false; return false;
} }
Logger::setThreadContext(weak_self);
if (SockUtil::inet_ntoa(peer_addr) != peer_ip) { if (SockUtil::inet_ntoa(peer_addr) != peer_ip) {
WarnP(strong_self.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") WarnP(strong_self.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< SockUtil::inet_ntoa(peer_addr); << SockUtil::inet_ntoa(peer_addr);
@ -1023,6 +1023,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) {
return; return;
} }
try { try {
Logger::setThreadContext(weak_self);
strong_self->onRcvPeerUdpData(interleaved, buf, addr); strong_self->onRcvPeerUdpData(interleaved, buf, addr);
} catch (SockException &ex) { } catch (SockException &ex) {
strong_self->shutdown(ex); strong_self->shutdown(ex);
@ -1048,7 +1049,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) {
WarnP(this) << "udp端口为空:" << interleaved; WarnP(this) << "udp端口为空:" << interleaved;
return; return;
} }
sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){ sock->setOnRead([onUdpData, interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr, int addr_len) {
onUdpData(pBuf, pPeerAddr, interleaved); onUdpData(pBuf, pPeerAddr, interleaved);
}); });
}; };

View File

@ -125,12 +125,10 @@ void SrtSession::onError(const SockException &err) {
// 防止互相引用导致不释放 // 防止互相引用导致不释放
auto transport = std::move(_transport); auto transport = std::move(_transport);
getPoller()->async( async([transport] {
[transport] { // 延时减引用防止使用transport对象时销毁对象
//延时减引用防止使用transport对象时销毁对象 // transport->onShutdown(err);
//transport->onShutdown(err); },false);
},
false);
} }
void SrtSession::onManager() { void SrtSession::onManager() {

View File

@ -123,6 +123,7 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender) {
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
Logger::setThreadContext(weak_self);
strong_self->onShutdown(SockException(Err_shutdown, err)); strong_self->onShutdown(SockException(Err_shutdown, err));
// 主动关闭推流,那么不延时注销 // 主动关闭推流,那么不延时注销
strong_self->_muxer = nullptr; strong_self->_muxer = nullptr;
@ -158,6 +159,7 @@ void SrtTransportImp::emitOnPublish() {
if (!strong_self) { if (!strong_self) {
return; return;
} }
Logger::setThreadContext(weak_self);
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info,0.0f, strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info,0.0f,
option); option);
@ -187,6 +189,7 @@ void SrtTransportImp::emitOnPlay() {
return; return;
} }
strong_self->getPoller()->async([strong_self, err] { strong_self->getPoller()->async([strong_self, err] {
Logger::setThreadContext(strong_self);
if (err != "") { if (err != "") {
strong_self->onShutdown(SockException(Err_refused, err)); strong_self->onShutdown(SockException(Err_refused, err));
} else { } else {

View File

@ -50,6 +50,7 @@ bool WebRtcPusher::close(MediaSource &sender) {
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
Logger::setThreadContext(strong_self);
strong_self->onShutdown(SockException(Err_shutdown, err)); strong_self->onShutdown(SockException(Err_shutdown, err));
//主动关闭推流,那么不延时注销 //主动关闭推流,那么不延时注销
strong_self->_push_src = nullptr; strong_self->_push_src = nullptr;

View File

@ -77,6 +77,7 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) {
if (strong_server) { if (strong_server) {
auto session = static_pointer_cast<WebRtcSession>(strong_server->createSession(sock)); auto session = static_pointer_cast<WebRtcSession>(strong_server->createSession(sock));
//2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程)重新处理一遍ice binding request命令 //2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程)重新处理一遍ice binding request命令
Logger::setThreadContext(session);
session->onRecv_l(str.data(), str.size()); session->onRecv_l(str.data(), str.size());
} }
}); });
@ -112,6 +113,7 @@ void WebRtcSession::onError(const SockException &err) {
auto transport = std::move(_transport); auto transport = std::move(_transport);
getPoller()->async([transport, self]() mutable { getPoller()->async([transport, self]() mutable {
//延时减引用防止使用transport对象时销毁对象 //延时减引用防止使用transport对象时销毁对象
Logger::setThreadContext(transport);
transport->removeTuple(self.get()); transport->removeTuple(self.get());
//确保transport在Session对象前销毁防止WebRtcTransport::onDestory()时获取不到Session对象 //确保transport在Session对象前销毁防止WebRtcTransport::onDestory()时获取不到Session对象
transport = nullptr; transport = nullptr;

View File

@ -111,7 +111,7 @@ const EventPoller::Ptr &WebRtcTransport::getPoller() const {
return _poller; return _poller;
} }
const string &WebRtcTransport::getIdentifier() const { string WebRtcTransport::getIdentifier() const {
return _identifier; return _identifier;
} }
@ -1081,6 +1081,7 @@ void WebRtcTransportImp::safeShutdown(const SockException &ex) {
std::weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this()); std::weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
getPoller()->async([ex, weak_self]() { getPoller()->async([ex, weak_self]() {
if (auto strong_self = weak_self.lock()) { if (auto strong_self = weak_self.lock()) {
Logger::setThreadContext(weak_self);
strong_self->onShutdown(ex); strong_self->onShutdown(ex);
} }
}); });

View File

@ -35,12 +35,11 @@ extern const std::string kTcpPort;
extern const std::string kTimeOutSec; extern const std::string kTimeOutSec;
}//namespace RTC }//namespace RTC
class WebRtcInterface { class WebRtcInterface : public LogThreadContext {
public: public:
WebRtcInterface() = default; WebRtcInterface() = default;
virtual ~WebRtcInterface() = default; virtual ~WebRtcInterface() = default;
virtual std::string getAnswerSdp(const std::string &offer) = 0; virtual std::string getAnswerSdp(const std::string &offer) = 0;
virtual const std::string& getIdentifier() const = 0;
virtual const std::string& deleteRandStr() const { static std::string s_null; return s_null; } virtual const std::string& deleteRandStr() const { static std::string s_null; return s_null; }
}; };
@ -53,10 +52,6 @@ public:
std::string getAnswerSdp(const std::string &offer) override { std::string getAnswerSdp(const std::string &offer) override {
throw _ex; throw _ex;
} }
const std::string &getIdentifier() const override {
static std::string s_null;
return s_null;
}
private: private:
SockException _ex; SockException _ex;
@ -92,7 +87,7 @@ public:
/** /**
* id * id
*/ */
const std::string& getIdentifier() const override; std::string getIdentifier() const override;
const std::string& deleteRandStr() const override; const std::string& deleteRandStr() const override;
/** /**