From c711eedaa7656a2dba9fa1bd9ad45f2d6aa77c17 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 12 Sep 2020 19:03:52 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8DZLToolKit=E4=BB=A3=E7=A0=81,?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89=E5=88=9B=E5=BB=BA?= =?UTF-8?q?Socket:#468?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- src/Http/HlsPlayer.cpp | 15 ++- src/Http/HttpClient.cpp | 8 +- src/Http/HttpSession.cpp | 8 +- src/Http/HttpTSPlayer.cpp | 8 +- src/Http/WebSocketClient.h | 4 +- src/Http/WebSocketSession.h | 2 +- src/Player/MediaPlayer.cpp | 30 ++++-- src/Player/MediaPlayer.h | 7 +- src/Pusher/MediaPusher.cpp | 34 +++--- src/Pusher/MediaPusher.h | 12 ++- src/Rtmp/RtmpPusher.cpp | 4 +- src/Rtmp/RtmpSession.cpp | 4 +- src/Rtp/PSRtpSender.cpp | 2 +- src/Rtp/RtpServer.cpp | 14 +-- src/Rtp/RtpSession.cpp | 2 +- src/Rtsp/RtpMultiCaster.cpp | 208 ++++++++++++++++++++---------------- src/Rtsp/RtpMultiCaster.h | 63 +++++------ src/Rtsp/Rtsp.cpp | 34 +++--- src/Rtsp/Rtsp.h | 2 +- src/Rtsp/RtspPlayer.cpp | 5 +- src/Rtsp/RtspPusher.cpp | 6 +- src/Rtsp/RtspSession.cpp | 28 ++--- src/Rtsp/UDPServer.cpp | 89 ++++++++------- src/Rtsp/UDPServer.h | 18 ++-- 25 files changed, 331 insertions(+), 278 deletions(-) diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index dfb20fe4..311bee0a 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit dfb20fe40baca2c22212a0f7e253dda8e1d623c5 +Subproject commit 311bee0aeff620f51bf43149b001c62079f40ea7 diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index 97ea6428..232e3453 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -13,7 +13,7 @@ namespace mediakit { HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller){ _segment.setOnSegment([this](const char *data, uint64_t len) { onPacket(data, len); }); - _poller = poller ? poller : EventPollerPool::Instance().getPoller(); + setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } HlsPlayer::~HlsPlayer() {} @@ -63,6 +63,15 @@ void HlsPlayer::playNextTs(bool force){ std::shared_ptr ticker(new Ticker); _http_ts_player = std::make_shared(getPoller(), false); + + _http_ts_player->setOnCreateSocket([weakSelf](const EventPoller::Ptr &poller) { + auto strongSelf = weakSelf.lock(); + if (strongSelf) { + return strongSelf->createSocket(); + } + return Socket::createSocket(poller, true); + }); + _http_ts_player->setOnDisconnect([weakSelf, ticker, ts_duration](const SockException &err) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { @@ -84,6 +93,7 @@ void HlsPlayer::playNextTs(bool force){ }, strongSelf->getPoller())); } }); + _http_ts_player->setOnPacket([weakSelf](const char *data, uint64_t len) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { @@ -94,9 +104,10 @@ void HlsPlayer::playNextTs(bool force){ }); _http_ts_player->setMethod("GET"); - if(!(*this)[kNetAdapter].empty()) { + if (!(*this)[kNetAdapter].empty()) { _http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]); } + _http_ts_player->sendRequest(_ts_list.front().url, 2 * _ts_list.front().duration); _ts_list.pop_front(); } diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index c2c22745..27549e6a 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -100,7 +100,7 @@ void HttpClient::onConnect(const SockException &ex) { } //先假设http客户端只会接收一点点数据(只接受http头,节省内存) - _sock->setReadBuffer(std::make_shared(1 * 1024)); + getSock()->setReadBuffer(std::make_shared(1 * 1024)); _totalBodySize = 0; _recvedBodySize = 0; @@ -157,7 +157,7 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { if(_parser["Transfer-Encoding"] == "chunked"){ //我们认为这种情况下后面应该有大量的数据过来,加大接收缓存提高性能 - _sock->setReadBuffer(std::make_shared(256 * 1024)); + getSock()->setReadBuffer(std::make_shared(256 * 1024)); //如果Transfer-Encoding字段等于chunked,则认为后续的content是不限制长度的 _totalBodySize = -1; @@ -185,9 +185,9 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { _recvedBodySize = 0; if(_totalBodySize > 0){ //根据_totalBodySize设置接收缓存大小 - _sock->setReadBuffer(std::make_shared(MIN(_totalBodySize + 1,256 * 1024))); + getSock()->setReadBuffer(std::make_shared(MIN(_totalBodySize + 1,256 * 1024))); }else{ - _sock->setReadBuffer(std::make_shared(256 * 1024)); + getSock()->setReadBuffer(std::make_shared(256 * 1024)); } return -1; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 7e26502a..3f173b05 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -476,7 +476,7 @@ void HttpSession::sendResponse(const char *pcStatus, //发送http body AsyncSenderData::Ptr data = std::make_shared(shared_from_this(),body,bClose); - _sock->setOnFlush([data](){ + getSock()->setOnFlush([data](){ return AsyncSender::onSocketFlushed(data); }); AsyncSender::onSocketFlushed(data); @@ -543,10 +543,10 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) { //根据Content-Length设置接收缓存大小 if(totalContentLen > 0){ - _sock->setReadBuffer(std::make_shared(MIN(totalContentLen + 1,256 * 1024))); + getSock()->setReadBuffer(std::make_shared(MIN(totalContentLen + 1,256 * 1024))); }else{ //不定长度的Content-Length - _sock->setReadBuffer(std::make_shared(256 * 1024)); + getSock()->setReadBuffer(std::make_shared(256 * 1024)); } if(totalContentLen > 0 && totalContentLen < maxReqSize ){ @@ -610,7 +610,7 @@ void HttpSession::setSocketFlags(){ GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); if(mergeWriteMS > 0) { //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 - SockUtil::setNoDelay(_sock->rawFD(), false); + SockUtil::setNoDelay(getSock()->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } diff --git a/src/Http/HttpTSPlayer.cpp b/src/Http/HttpTSPlayer.cpp index fb525f11..706e1df3 100644 --- a/src/Http/HttpTSPlayer.cpp +++ b/src/Http/HttpTSPlayer.cpp @@ -12,9 +12,9 @@ namespace mediakit { HttpTSPlayer::HttpTSPlayer(const EventPoller::Ptr &poller, bool split_ts){ - _segment.setOnSegment([this](const char *data, uint64_t len) { onPacket(data, len); }); - _poller = poller ? poller : EventPollerPool::Instance().getPoller(); _split_ts = split_ts; + _segment.setOnSegment([this](const char *data, uint64_t len) { onPacket(data, len); }); + setPoller(poller ? poller : EventPollerPool::Instance().getPoller()); } HttpTSPlayer::~HttpTSPlayer() {} @@ -25,8 +25,8 @@ int64_t HttpTSPlayer::onResponseHeader(const string &status, const HttpClient::H shutdown(SockException(Err_other, StrPrinter << "bad http status code:" + status)); return 0; } - auto contet_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; - if (contet_type.find("video/mp2t") == 0 || contet_type.find("video/mpeg") == 0) { + auto content_type = const_cast< HttpClient::HttpHeader &>(headers)["Content-Type"]; + if (content_type.find("video/mp2t") == 0 || content_type.find("video/mpeg") == 0) { _is_ts_content = true; } diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index d92f1e33..4ecce5cb 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -74,7 +74,7 @@ public: HttpWsClient(ClientTypeImp &delegate) : _delegate(delegate){ _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false))); - _poller = delegate.getPoller(); + setPoller(delegate.getPoller()); } ~HttpWsClient(){} @@ -312,7 +312,7 @@ private: }); //设置sock,否则shutdown等接口都无效 - _delegate.setSock(HttpClientImp::_sock); + _delegate.setSock(HttpClientImp::getSock()); //触发连接成功事件 _delegate.onConnect(ex); //拦截websocket数据接收 diff --git a/src/Http/WebSocketSession.h b/src/Http/WebSocketSession.h index 65840250..69b691f1 100644 --- a/src/Http/WebSocketSession.h +++ b/src/Http/WebSocketSession.h @@ -117,7 +117,7 @@ protected: */ bool onWebSocketConnect(const Parser &header) override{ //创建websocket session类 - _session = _creator(header, *this,HttpSessionType::_sock); + _session = _creator(header, *this,HttpSessionType::getSock()); if(!_session){ //此url不允许创建websocket连接 return false; diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index 6b95a4eb..e77888f3 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -17,31 +17,43 @@ using namespace toolkit; namespace mediakit { MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) { - _poller = poller; - if(!_poller){ - _poller = EventPollerPool::Instance().getPoller(); - } + _poller = poller ? poller : EventPollerPool::Instance().getPoller(); } MediaPlayer::~MediaPlayer() { } -void MediaPlayer::play(const string &strUrl) { - _delegate = PlayerBase::createPlayer(_poller,strUrl); + +static void setOnCreateSocket_l(const std::shared_ptr &delegate, const Socket::onCreateSocket &cb){ + auto helper = dynamic_pointer_cast(delegate); + if (helper) { + helper->setOnCreateSocket(cb); + } +} + +void MediaPlayer::play(const string &url) { + _delegate = PlayerBase::createPlayer(_poller, url); + assert(_delegate); + setOnCreateSocket_l(_delegate, _on_create_socket); _delegate->setOnShutdown(_shutdownCB); _delegate->setOnPlayResult(_playResultCB); _delegate->setOnResume(_resumeCB); _delegate->setMediaSouce(_pMediaSrc); _delegate->mINI::operator=(*this); - _delegate->play(strUrl); + _delegate->play(url); } EventPoller::Ptr MediaPlayer::getPoller(){ return _poller; } -void MediaPlayer::pause(bool bPause) { +void MediaPlayer::setOnCreateSocket(Socket::onCreateSocket cb){ + setOnCreateSocket_l(_delegate, cb); + _on_create_socket = std::move(cb); +} + +void MediaPlayer::pause(bool pause) { if (_delegate) { - _delegate->pause(bPause); + _delegate->pause(pause); } } diff --git a/src/Player/MediaPlayer.h b/src/Player/MediaPlayer.h index 42b6eb61..0e41deb1 100644 --- a/src/Player/MediaPlayer.h +++ b/src/Player/MediaPlayer.h @@ -27,12 +27,15 @@ public: MediaPlayer(const EventPoller::Ptr &poller = nullptr); virtual ~MediaPlayer(); - void play(const string &strUrl) override; - void pause(bool bPause) override; + void play(const string &url) override; + void pause(bool pause) override; void teardown() override; EventPoller::Ptr getPoller(); + void setOnCreateSocket(Socket::onCreateSocket cb); + private: EventPoller::Ptr _poller; + Socket::onCreateSocket _on_create_socket; }; } /* namespace mediakit */ diff --git a/src/Pusher/MediaPusher.cpp b/src/Pusher/MediaPusher.cpp index f7fca4ed..822f5103 100644 --- a/src/Pusher/MediaPusher.cpp +++ b/src/Pusher/MediaPusher.cpp @@ -19,34 +19,44 @@ namespace mediakit { MediaPusher::MediaPusher(const MediaSource::Ptr &src, const EventPoller::Ptr &poller) { _src = src; - _poller = poller; - if(!_poller){ - _poller = EventPollerPool::Instance().getPoller(); - } + _poller = poller ? poller : EventPollerPool::Instance().getPoller(); } MediaPusher::MediaPusher(const string &schema, - const string &strVhost, - const string &strApp, - const string &strStream, + const string &vhost, + const string &app, + const string &stream, const EventPoller::Ptr &poller) : - MediaPusher(MediaSource::find(schema,strVhost,strApp,strStream),poller){ + MediaPusher(MediaSource::find(schema, vhost, app, stream), poller){ } MediaPusher::~MediaPusher() { } -void MediaPusher::publish(const string &strUrl) { - _delegate = PusherBase::createPusher(_poller,_src.lock(),strUrl); + +static void setOnCreateSocket_l(const std::shared_ptr &delegate, const Socket::onCreateSocket &cb){ + auto helper = dynamic_pointer_cast(delegate); + if (helper) { + helper->setOnCreateSocket(cb); + } +} + +void MediaPusher::publish(const string &url) { + _delegate = PusherBase::createPusher(_poller, _src.lock(), url); + assert(_delegate); + setOnCreateSocket_l(_delegate, _on_create_socket); _delegate->setOnShutdown(_shutdownCB); _delegate->setOnPublished(_publishCB); _delegate->mINI::operator=(*this); - _delegate->publish(strUrl); + _delegate->publish(url); } EventPoller::Ptr MediaPusher::getPoller(){ return _poller; } - +void MediaPusher::setOnCreateSocket(Socket::onCreateSocket cb){ + setOnCreateSocket_l(_delegate, cb); + _on_create_socket = std::move(cb); +} } /* namespace mediakit */ diff --git a/src/Pusher/MediaPusher.h b/src/Pusher/MediaPusher.h index 5e4974fb..2e2f985e 100644 --- a/src/Pusher/MediaPusher.h +++ b/src/Pusher/MediaPusher.h @@ -24,20 +24,24 @@ public: typedef std::shared_ptr Ptr; MediaPusher(const string &schema, - const string &strVhost, - const string &strApp, - const string &strStream, + const string &vhost, + const string &app, + const string &stream, const EventPoller::Ptr &poller = nullptr); MediaPusher(const MediaSource::Ptr &src, const EventPoller::Ptr &poller = nullptr); virtual ~MediaPusher(); - void publish(const string &strUrl) override; + + void publish(const string &url) override; EventPoller::Ptr getPoller(); + void setOnCreateSocket(Socket::onCreateSocket cb); + private: std::weak_ptr _src; EventPoller::Ptr _poller; + Socket::onCreateSocket _on_create_socket; }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 42f0dc82..201d6f9e 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -120,7 +120,7 @@ void RtmpPusher::onConnect(const SockException &err){ return; } //推流器不需要多大的接收缓存,节省内存占用 - _sock->setReadBuffer(std::make_shared(1 * 1024)); + getSock()->setReadBuffer(std::make_shared(1 * 1024)); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); startClientSession([weak_self]() { @@ -239,7 +239,7 @@ void RtmpPusher::setSocketFlags(){ if (mergeWriteMS > 0) { //提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); - SockUtil::setNoDelay(_sock->rawFD(), false); + SockUtil::setNoDelay(getSock()->rawFD(), false); } } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 8c123fc3..03b2760a 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -153,7 +153,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _publisher_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4); //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 - _sock->setReadBuffer(std::make_shared(256 * 1024)); + getSock()->setReadBuffer(std::make_shared(256 * 1024)); setSocketFlags(); }; @@ -548,7 +548,7 @@ void RtmpSession::setSocketFlags(){ GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); if (merge_write_ms > 0) { //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 - SockUtil::setNoDelay(_sock->rawFD(), false); + SockUtil::setNoDelay(getSock()->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } diff --git a/src/Rtp/PSRtpSender.cpp b/src/Rtp/PSRtpSender.cpp index fe97b9a3..5f24a775 100644 --- a/src/Rtp/PSRtpSender.cpp +++ b/src/Rtp/PSRtpSender.cpp @@ -31,7 +31,7 @@ PSRtpSender::~PSRtpSender() { void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ _is_udp = is_udp; - _socket = std::make_shared(_poller, false); + _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; _dst_port = dst_port; weak_ptr weak_self = shared_from_this(); diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index f7a4c9eb..5b92226b 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -24,7 +24,7 @@ RtpServer::~RtpServer() { void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { //创建udp服务器 - Socket::Ptr udp_server = std::make_shared(nullptr, false); + Socket::Ptr udp_server = Socket::createSocket(nullptr, false); if (!udp_server->bindUdpSock(local_port, local_ip)) { throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); } @@ -33,14 +33,10 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable TcpServer::Ptr tcp_server; if (enable_tcp) { - try { - //创建tcp服务器 - tcp_server = std::make_shared(udp_server->getPoller()); - (*tcp_server)[RtpSession::kStreamID] = stream_id; - tcp_server->start(udp_server->get_local_port(), local_ip); - } catch (...) { - throw; - } + //创建tcp服务器 + tcp_server = std::make_shared(udp_server->getPoller()); + (*tcp_server)[RtpSession::kStreamID] = stream_id; + tcp_server->start(udp_server->get_local_port(), local_ip); } RtpProcess::Ptr process; diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 2e3f63fe..72037b5e 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -70,7 +70,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(_sock, data + 2, len - 2, &addr); + _process->inputRtp(getSock(), data + 2, len - 2, &addr); _ticker.resetTime(); } diff --git a/src/Rtsp/RtpMultiCaster.cpp b/src/Rtsp/RtpMultiCaster.cpp index 03f84a66..27226540 100644 --- a/src/Rtsp/RtpMultiCaster.cpp +++ b/src/Rtsp/RtpMultiCaster.cpp @@ -25,153 +25,177 @@ MultiCastAddressMaker &MultiCastAddressMaker::Instance() { return instance; } -static uint32_t addressToInt(const string &ip){ - struct in_addr addr; - bzero(&addr,sizeof(addr)); - addr.s_addr = inet_addr(ip.data()); - return (uint32_t)ntohl((uint32_t &)addr.s_addr); +bool MultiCastAddressMaker::isMultiCastAddress(uint32_t addr) { + static uint32_t addrMin = mINI::Instance()[MultiCast::kAddrMin].as(); + static uint32_t addrMax = mINI::Instance()[MultiCast::kAddrMax].as(); + return addr >= addrMin && addr <= addrMax; } -std::shared_ptr MultiCastAddressMaker::obtain(uint32_t iTry) { +string MultiCastAddressMaker::toString(uint32_t addr) { + addr = htonl(addr); + return SockUtil::inet_ntoa((struct in_addr &) (addr)); +} + +static uint32_t addressToInt(const string &ip){ + struct in_addr addr; + bzero(&addr, sizeof(addr)); + addr.s_addr = inet_addr(ip.data()); + return (uint32_t) ntohl((uint32_t &) addr.s_addr); +} + +std::shared_ptr MultiCastAddressMaker::obtain(uint32_t max_try) { lock_guard lck(_mtx); - GET_CONFIG(string,addrMinStr,MultiCast::kAddrMin); - GET_CONFIG(string,addrMaxStr,MultiCast::kAddrMax); + GET_CONFIG(string, addrMinStr, MultiCast::kAddrMin); + GET_CONFIG(string, addrMaxStr, MultiCast::kAddrMax); uint32_t addrMin = addressToInt(addrMinStr); uint32_t addrMax = addressToInt(addrMaxStr); - if(_iAddr > addrMax || _iAddr == 0){ - _iAddr = addrMin; + if (_addr > addrMax || _addr == 0) { + _addr = addrMin; } - auto iGotAddr = _iAddr++; - if(_setBadAddr.find(iGotAddr) != _setBadAddr.end()){ + auto iGotAddr = _addr++; + if (_used_addr.find(iGotAddr) != _used_addr.end()) { //已经分配过了 - if(iTry){ - return obtain(--iTry); + if (max_try) { + return obtain(--max_try); } //分配完了,应该不可能到这里 ErrorL; return nullptr; } - _setBadAddr.emplace(iGotAddr); - std::shared_ptr ret(new uint32_t(iGotAddr),[](uint32_t *ptr){ + _used_addr.emplace(iGotAddr); + std::shared_ptr ret(new uint32_t(iGotAddr), [](uint32_t *ptr) { MultiCastAddressMaker::Instance().release(*ptr); delete ptr; }); return ret; } -void MultiCastAddressMaker::release(uint32_t iAddr){ + +void MultiCastAddressMaker::release(uint32_t addr){ lock_guard lck(_mtx); - _setBadAddr.erase(iAddr); + _used_addr.erase(addr); } +//////////////////////////////////////////////////////////////////////////////////// -recursive_mutex RtpMultiCaster::g_mtx; -unordered_map > RtpMultiCaster::g_mapBroadCaster; +recursive_mutex g_mtx; +unordered_map > g_multi_caster_map; void RtpMultiCaster::setDetachCB(void* listener, const onDetach& cb) { lock_guard lck(_mtx); - if(cb){ - _mapDetach.emplace(listener,cb); - }else{ - _mapDetach.erase(listener); + if (cb) { + _detach_map.emplace(listener, cb); + } else { + _detach_map.erase(listener); } } + RtpMultiCaster::~RtpMultiCaster() { - _pReader->setReadCB(nullptr); - _pReader->setDetachCB(nullptr); + _rtp_reader->setReadCB(nullptr); + _rtp_reader->setDetachCB(nullptr); DebugL; } -RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) { - auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA,strVhost,strApp, strStream)); - if(!src){ - auto strErr = StrPrinter << "未找到媒体源:" << strVhost << " " << strApp << " " << strStream << endl; - throw std::runtime_error(strErr); +RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream) { + auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, vhost, app, stream)); + if (!src) { + auto err = StrPrinter << "未找到媒体源:" << vhost << " " << app << " " << stream << endl; + throw std::runtime_error(err); + } + _multicast_ip = MultiCastAddressMaker::Instance().obtain(); + if (!_multicast_ip) { + throw std::runtime_error("获取组播地址失败"); } - _multiAddr = MultiCastAddressMaker::Instance().obtain(); - for(auto i = 0; i < 2; i++){ - _apUdpSock[i].reset(new Socket(poller)); - if(!_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){ - auto strErr = StrPrinter << "绑定UDP端口失败:" << strLocalIp << endl; - throw std::runtime_error(strErr); - } - auto fd = _apUdpSock[i]->rawFD(); - GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL); + for (auto i = 0; i < 2; ++i) { + //创建udp socket, 数组下标为TrackType + _udp_sock[i] = helper.createSocket(); + if (!_udp_sock[i]->bindUdpSock(0, local_ip.data())) { + auto err = StrPrinter << "绑定UDP端口失败:" << local_ip << endl; + throw std::runtime_error(err); + } + auto fd = _udp_sock[i]->rawFD(); + GET_CONFIG(uint32_t, udpTTL, MultiCast::kUdpTTL); SockUtil::setMultiTTL(fd, udpTTL); SockUtil::setMultiLOOP(fd, false); - SockUtil::setMultiIF(fd, strLocalIp.data()); + SockUtil::setMultiIF(fd, local_ip.data()); - struct sockaddr_in &peerAddr = _aPeerUdpAddr[i]; - peerAddr.sin_family = AF_INET; - peerAddr.sin_port = htons(_apUdpSock[i]->get_local_port()); - peerAddr.sin_addr.s_addr = htonl(*_multiAddr); - bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); - _apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr); + struct sockaddr_in peer; + peer.sin_family = AF_INET; + //组播目标端口为本地发送端口 + peer.sin_port = htons(_udp_sock[i]->get_local_port()); + //组播目标地址 + peer.sin_addr.s_addr = htonl(*_multicast_ip); + bzero(&(peer.sin_zero), sizeof peer.sin_zero); + _udp_sock[i]->setSendPeerAddr((struct sockaddr *) &peer); } - _pReader = src->getRing()->attach(poller); - _pReader->setReadCB([this](const RtspMediaSource::RingDataType &pkt){ + + _rtp_reader = src->getRing()->attach(helper.getPoller()); + _rtp_reader->setReadCB([this](const RtspMediaSource::RingDataType &pkt) { int i = 0; int size = pkt->size(); pkt->for_each([&](const RtpPacket::Ptr &rtp) { - auto &pSock = _apUdpSock[rtp->type]; - auto &peerAddr = _aPeerUdpAddr[rtp->type]; - BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); - pSock->send(buffer, nullptr, 0, ++i == size); + auto &sock = _udp_sock[rtp->type]; + sock->send(std::make_shared(rtp, 4), nullptr, 0, ++i == size); }); }); - _pReader->setDetachCB([this](){ - unordered_map _mapDetach_copy; + _rtp_reader->setDetachCB([this]() { + unordered_map _detach_map_copy; { lock_guard lck(_mtx); - _mapDetach_copy = std::move(_mapDetach); + _detach_map_copy = std::move(_detach_map); } - for(auto &pr : _mapDetach_copy){ + for (auto &pr : _detach_map_copy) { pr.second(); } }); - DebugL << MultiCastAddressMaker::toString(*_multiAddr) << " " - << _apUdpSock[0]->get_local_port() << " " - << _apUdpSock[1]->get_local_port() << " " - << strVhost << " " - << strApp << " " << strStream; -} -uint16_t RtpMultiCaster::getPort(TrackType trackType){ - return _apUdpSock[trackType]->get_local_port(); -} -string RtpMultiCaster::getIP(){ - return SockUtil::inet_ntoa(_aPeerUdpAddr[0].sin_addr); -} -RtpMultiCaster::Ptr RtpMultiCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){ - try{ - auto ret = Ptr(new RtpMultiCaster(poller,strLocalIp,strVhost,strApp,strStream),[poller](RtpMultiCaster *ptr){ - poller->async([ptr]() { - delete ptr; - }); - }); - lock_guard lck(g_mtx); - string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl; - weak_ptr weakPtr = ret; - g_mapBroadCaster.emplace(strKey,weakPtr); - return ret; - }catch (std::exception &ex) { - WarnL << ex.what(); - return nullptr; - } + + DebugL << MultiCastAddressMaker::toString(*_multicast_ip) << " " + << _udp_sock[0]->get_local_port() << " " + << _udp_sock[1]->get_local_port() << " " + << vhost << " " << app << " " << stream; } -RtpMultiCaster::Ptr RtpMultiCaster::get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) { - string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl; +uint16_t RtpMultiCaster::getMultiCasterPort(TrackType trackType) { + return _udp_sock[trackType]->get_local_port(); +} + +string RtpMultiCaster::getMultiCasterIP() { + struct in_addr addr; + addr.s_addr = htonl(*_multicast_ip); + return SockUtil::inet_ntoa(addr); +} + +RtpMultiCaster::Ptr RtpMultiCaster::get(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream) { + static auto on_create = [](SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream){ + try { + auto poller = helper.getPoller(); + auto ret = RtpMultiCaster::Ptr(new RtpMultiCaster(helper, local_ip, vhost, app, stream), [poller](RtpMultiCaster *ptr) { + poller->async([ptr]() { + delete ptr; + }); + }); + lock_guard lck(g_mtx); + string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl; + g_multi_caster_map.emplace(strKey, ret); + return ret; + } catch (std::exception &ex) { + WarnL << ex.what(); + return RtpMultiCaster::Ptr(); + } + }; + + string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl; lock_guard lck(g_mtx); - auto it = g_mapBroadCaster.find(strKey); - if (it == g_mapBroadCaster.end()) { - return make(poller,strLocalIp,strVhost,strApp, strStream); + auto it = g_multi_caster_map.find(strKey); + if (it == g_multi_caster_map.end()) { + return on_create(helper, local_ip, vhost, app, stream); } auto ret = it->second.lock(); if (!ret) { - g_mapBroadCaster.erase(it); - return make(poller,strLocalIp,strVhost,strApp, strStream); + g_multi_caster_map.erase(it); + return on_create(helper, local_ip, vhost, app, stream); } return ret; } diff --git a/src/Rtsp/RtpMultiCaster.h b/src/Rtsp/RtpMultiCaster.h index d3893ad7..8f8e76a6 100644 --- a/src/Rtsp/RtpMultiCaster.h +++ b/src/Rtsp/RtpMultiCaster.h @@ -11,7 +11,6 @@ #ifndef SRC_RTSP_RTPBROADCASTER_H_ #define SRC_RTSP_RTPBROADCASTER_H_ - #include #include #include @@ -20,60 +19,52 @@ #include "RtspMediaSource.h" #include "Util/mini.h" #include "Network/Socket.h" - using namespace std; using namespace toolkit; namespace mediakit{ -class MultiCastAddressMaker -{ +class MultiCastAddressMaker { public: - static MultiCastAddressMaker &Instance(); + ~MultiCastAddressMaker() {} + static MultiCastAddressMaker& Instance(); + static bool isMultiCastAddress(uint32_t addr); + static string toString(uint32_t addr); + + std::shared_ptr obtain(uint32_t max_try = 10); - static bool isMultiCastAddress(uint32_t iAddr){ - static uint32_t addrMin = mINI::Instance()[MultiCast::kAddrMin].as(); - static uint32_t addrMax = mINI::Instance()[MultiCast::kAddrMax].as(); - return iAddr >= addrMin && iAddr <= addrMax; - } - static string toString(uint32_t iAddr){ - iAddr = htonl(iAddr); - return SockUtil::inet_ntoa((struct in_addr &)(iAddr)); - } - virtual ~MultiCastAddressMaker(){} - std::shared_ptr obtain(uint32_t iTry = 10); private: - MultiCastAddressMaker(){}; - void release(uint32_t iAddr); - uint32_t _iAddr = 0; + MultiCastAddressMaker() {}; + void release(uint32_t addr); + +private: + uint32_t _addr = 0; recursive_mutex _mtx; - unordered_set _setBadAddr; + unordered_set _used_addr; }; + class RtpMultiCaster { public: typedef std::shared_ptr Ptr; typedef function onDetach; - virtual ~RtpMultiCaster(); - static Ptr get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); + ~RtpMultiCaster(); + + static Ptr get(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream); void setDetachCB(void *listener,const onDetach &cb); - uint16_t getPort(TrackType trackType); - string getIP(); + + string getMultiCasterIP(); + uint16_t getMultiCasterPort(TrackType trackType); + private: - static recursive_mutex g_mtx; - static unordered_map > g_mapBroadCaster; - static Ptr make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); + RtpMultiCaster(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream); - std::shared_ptr _multiAddr; +private: recursive_mutex _mtx; - unordered_map _mapDetach; - RtspMediaSource::RingType::RingReader::Ptr _pReader; - Socket::Ptr _apUdpSock[2]; - struct sockaddr_in _aPeerUdpAddr[2]; - - RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); - + Socket::Ptr _udp_sock[2]; + std::shared_ptr _multicast_ip; + unordered_map _detach_map; + RtspMediaSource::RingType::RingReader::Ptr _rtp_reader; }; }//namespace mediakit - #endif /* SRC_RTSP_RTPBROADCASTER_H_ */ diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index c93331d6..58944ca8 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -365,8 +365,10 @@ bool RtspUrl::setup(bool isSSL, const string &strUrl, const string &strUser, con return true; } -std::pair makeSockPair_l(const EventPoller::Ptr &poller, const string &local_ip){ - auto pSockRtp = std::make_shared(poller); +static void makeSockPair_l(std::pair &pair, const string &local_ip){ + auto &pSockRtp = pair.first; + auto &pSockRtcp = pair.second; + if (!pSockRtp->bindUdpSock(0, local_ip.data())) { //分配端口失败 throw runtime_error("open udp socket failed"); @@ -374,7 +376,6 @@ std::pair makeSockPair_l(const EventPoller::Ptr &polle //是否是偶数 bool even_numbers = pSockRtp->get_local_port() % 2 == 0; - auto pSockRtcp = std::make_shared(poller); if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + (even_numbers ? 1 : -1), local_ip.data())) { //分配端口失败 throw runtime_error("open udp socket failed"); @@ -386,22 +387,21 @@ std::pair makeSockPair_l(const EventPoller::Ptr &polle pSockRtp = pSockRtcp; pSockRtcp = tmp; } - - return std::make_pair(pSockRtp, pSockRtcp); } -std::pair makeSockPair(const EventPoller::Ptr &poller, const string &local_ip){ - int try_count = 0; - while (true) { - try { - return makeSockPair_l(poller, local_ip); - } catch (...) { - if (++try_count == 3) { - throw; - } - WarnL << "open udp socket failed, retry: " << try_count; - } - } + void makeSockPair(std::pair &pair, const string &local_ip){ + int try_count = 0; + while (true) { + try { + makeSockPair_l(pair, local_ip); + break; + } catch (...) { + if (++try_count == 3) { + throw; + } + WarnL << "open udp socket failed, retry: " << try_count; + } + } } string printSSRC(uint32_t ui32Ssrc) { diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index ff31295c..eb27510b 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -271,7 +271,7 @@ private: _StrPrinter _printer; }; -std::pair makeSockPair(const EventPoller::Ptr &poller, const string &local_ip); +void makeSockPair(std::pair &pair, const string &local_ip); string printSSRC(uint32_t ui32Ssrc); } //namespace mediakit diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 6fb012f2..dc899a5d 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -211,7 +211,8 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){ auto &rtpSockRef = _rtp_sock[track_idx]; auto &rtcpSockRef = _rtcp_sock[track_idx]; if (!rtpSockRef || !rtcpSockRef) { - auto pr = makeSockPair(getPoller(), get_local_ip()); + std::pair pr = std::make_pair(createSocket(), createSocket()); + makeSockPair(pr, get_local_ip()); rtpSockRef = pr.first; rtcpSockRef = pr.second; } @@ -280,7 +281,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) { if (_rtp_type == Rtsp::RTP_MULTICAST) { //udp组播 auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";"); - pRtpSockRef.reset(new Socket(getPoller())); + pRtpSockRef = createSocket(); if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) { pRtpSockRef.reset(); throw std::runtime_error("open udp sock err"); diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index fef34056..4728c8b4 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -122,7 +122,7 @@ void RtspPusher::onConnect(const SockException &err) { return; } //推流器不需要多大的接收缓存,节省内存占用 - _sock->setReadBuffer(std::make_shared(1 * 1024)); + getSock()->setReadBuffer(std::make_shared(1 * 1024)); sendAnnounce(); } @@ -228,7 +228,7 @@ bool RtspPusher::handleAuthenticationFailure(const string ¶ms_str) { void RtspPusher::createUdpSockIfNecessary(int track_idx){ auto &rtp_sock = _udp_socks[track_idx]; if (!rtp_sock) { - rtp_sock.reset(new Socket(getPoller())); + rtp_sock = createSocket(); //rtp随机端口 if (!rtp_sock->bindUdpSock(0, get_local_ip().data())) { rtp_sock.reset(); @@ -400,7 +400,7 @@ void RtspPusher::setSocketFlags(){ if (merge_write_ms > 0) { //提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); - SockUtil::setNoDelay(_sock->rawFD(), false); + SockUtil::setNoDelay(getSock()->rawFD(), false); } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 9eef2b26..42077ba0 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -277,7 +277,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); if(_rtp_type == Rtsp::RTP_TCP){ //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能 - _sock->setReadBuffer(std::make_shared(256 * 1024)); + getSock()->setReadBuffer(std::make_shared(256 * 1024)); setSocketFlags(); } }; @@ -667,10 +667,10 @@ void RtspSession::handleReq_Setup(const Parser &parser) { break; case Rtsp::RTP_UDP: { - std::pair pr; - try{ - pr = makeSockPair(_sock->getPoller(), get_local_ip()); - }catch(std::exception &ex) { + std::pair pr = std::make_pair(createSocket(),createSocket()); + try { + makeSockPair(pr, get_local_ip()); + } catch (std::exception &ex) { //分配端口失败 send_NotAcceptable(); throw SockException(Err_shutdown, ex.what()); @@ -681,8 +681,8 @@ void RtspSession::handleReq_Setup(const Parser &parser) { //设置客户端内网端口信息 string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL); - uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data()); - uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data()); + uint16_t ui16RtpPort = atoi(FindField(strClientPort.data(), NULL, "-").data()); + uint16_t ui16RtcpPort = atoi(FindField(strClientPort.data(), "-", NULL).data()); struct sockaddr_in peerAddr; //设置rtp发送目标地址 @@ -690,14 +690,14 @@ void RtspSession::handleReq_Setup(const Parser &parser) { peerAddr.sin_port = htons(ui16RtpPort); peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data()); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); - pr.first->setSendPeerAddr((struct sockaddr *)(&peerAddr)); + pr.first->setSendPeerAddr((struct sockaddr *) (&peerAddr)); //设置rtcp发送目标地址 peerAddr.sin_family = AF_INET; peerAddr.sin_port = htons(ui16RtcpPort); peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data()); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); - pr.second->setSendPeerAddr((struct sockaddr *)(&peerAddr)); + pr.second->setSendPeerAddr((struct sockaddr *) (&peerAddr)); //尝试获取客户端nat映射地址 startListenPeerUdpData(trackIdx); @@ -714,7 +714,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) { break; case Rtsp::RTP_MULTICAST: { if(!_multicaster){ - _multicaster = RtpMultiCaster::get(getPoller(), get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid); + _multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid); if (!_multicaster) { send_NotAcceptable(); throw SockException(Err_shutdown, "can not get a available udp multicast socket"); @@ -728,10 +728,10 @@ void RtspSession::handleReq_Setup(const Parser &parser) { strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached")); }); } - int iSrvPort = _multicaster->getPort(trackRef->_type); + int iSrvPort = _multicaster->getMultiCasterPort(trackRef->_type); //我们用trackIdx区分rtp和rtcp包 //由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口 - auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); + auto pSockRtcp = UDPServer::Instance().getSock(*this, get_local_ip().data(), 2 * trackIdx + 1, iSrvPort + 1); if (!pSockRtcp) { //分配端口失败 send_NotAcceptable(); @@ -742,7 +742,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) { sendRtspResponse("200 OK", {"Transport", StrPrinter << "RTP/AVP;multicast;" - << "destination=" << _multicaster->getIP() << ";" + << "destination=" << _multicaster->getMultiCasterIP() << ";" << "source=" << get_local_ip() << ";" << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";" << "ttl=" << udpTTL << ";" @@ -1230,7 +1230,7 @@ void RtspSession::setSocketFlags(){ GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); if(mergeWriteMS > 0) { //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 - SockUtil::setNoDelay(_sock->rawFD(), false); + SockUtil::setNoDelay(getSock()->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } diff --git a/src/Rtsp/UDPServer.cpp b/src/Rtsp/UDPServer.cpp index a31b3815..ce69652e 100644 --- a/src/Rtsp/UDPServer.cpp +++ b/src/Rtsp/UDPServer.cpp @@ -25,72 +25,71 @@ UDPServer::~UDPServer() { InfoL; } -Socket::Ptr UDPServer::getSock(const EventPoller::Ptr &poller,const char* strLocalIp, int intervaled,uint16_t iLocalPort) { - lock_guard lck(_mtxUpdSock); - string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl; - auto it = _mapUpdSock.find(strKey); - if (it == _mapUpdSock.end()) { - Socket::Ptr pSock(new Socket(poller)); - //InfoL<bindUdpSock(iLocalPort, strLocalIp)) { +Socket::Ptr UDPServer::getSock(SocketHelper &helper, const char* local_ip, int interleaved, uint16_t local_port) { + lock_guard lck(_mtx_udp_sock); + string key = StrPrinter << local_ip << ":" << interleaved << endl; + auto it = _udp_sock_map.find(key); + if (it == _udp_sock_map.end()) { + Socket::Ptr sock = helper.createSocket(); + if (!sock->bindUdpSock(local_port, local_ip)) { //分配失败 return nullptr; } - pSock->setOnRead(bind(&UDPServer::onRcvData, this, intervaled, placeholders::_1,placeholders::_2)); - pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1)); - _mapUpdSock[strKey] = pSock; - DebugL << strLocalIp << " " << pSock->get_local_port() << " " << intervaled; - return pSock; + sock->setOnErr(bind(&UDPServer::onErr, this, key, placeholders::_1)); + sock->setOnRead(bind(&UDPServer::onRecv, this, interleaved, placeholders::_1, placeholders::_2)); + _udp_sock_map[key] = sock; + DebugL << local_ip << " " << sock->get_local_port() << " " << interleaved; + return sock; } return it->second; } -void UDPServer::listenPeer(const char* strPeerIp, void* pSelf, const onRecvData& cb) { - lock_guard lck(_mtxDataHandler); - auto &mapRef = _mapDataHandler[strPeerIp]; - mapRef.emplace(pSelf, cb); +void UDPServer::listenPeer(const char* peer_ip, void* obj, const onRecvData &cb) { + lock_guard lck(_mtx_on_recv); + auto &ref = _on_recv_map[peer_ip]; + ref.emplace(obj, cb); } -void UDPServer::stopListenPeer(const char* strPeerIp, void* pSelf) { - lock_guard lck(_mtxDataHandler); - auto it0 = _mapDataHandler.find(strPeerIp); - if (it0 == _mapDataHandler.end()) { +void UDPServer::stopListenPeer(const char* peer_ip, void* obj) { + lock_guard lck(_mtx_on_recv); + auto it0 = _on_recv_map.find(peer_ip); + if (it0 == _on_recv_map.end()) { return; } - auto &mapRef = it0->second; - auto it1 = mapRef.find(pSelf); - if (it1 != mapRef.end()) { - mapRef.erase(it1); + auto &ref = it0->second; + auto it1 = ref.find(obj); + if (it1 != ref.end()) { + ref.erase(it1); } - if (mapRef.size() == 0) { - _mapDataHandler.erase(it0); + if (ref.size() == 0) { + _on_recv_map.erase(it0); } - } -void UDPServer::onErr(const string& strKey, const SockException& err) { + +void UDPServer::onErr(const string &key, const SockException &err) { WarnL << err.what(); - lock_guard lck(_mtxUpdSock); - _mapUpdSock.erase(strKey); + lock_guard lck(_mtx_udp_sock); + _udp_sock_map.erase(key); } -void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) { - //TraceL << trackIndex; - struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr; - string peerIp = SockUtil::inet_ntoa(in->sin_addr); - lock_guard lck(_mtxDataHandler); - auto it0 = _mapDataHandler.find(peerIp); - if (it0 == _mapDataHandler.end()) { + +void UDPServer::onRecv(int interleaved, const Buffer::Ptr &buf, struct sockaddr* peer_addr) { + struct sockaddr_in *in = (struct sockaddr_in *) peer_addr; + string peer_ip = SockUtil::inet_ntoa(in->sin_addr); + lock_guard lck(_mtx_on_recv); + auto it0 = _on_recv_map.find(peer_ip); + if (it0 == _on_recv_map.end()) { return; } - auto &mapRef = it0->second; - for (auto it1 = mapRef.begin(); it1 != mapRef.end(); ++it1) { - onRecvData &funRef = it1->second; - if (!funRef(intervaled, pBuf, pPeerAddr)) { - it1 = mapRef.erase(it1); + auto &ref = it0->second; + for (auto it1 = ref.begin(); it1 != ref.end(); ++it1) { + auto &func = it1->second; + if (!func(interleaved, buf, peer_addr)) { + it1 = ref.erase(it1); } } - if (mapRef.size() == 0) { - _mapDataHandler.erase(it0); + if (ref.size() == 0) { + _on_recv_map.erase(it0); } } diff --git a/src/Rtsp/UDPServer.h b/src/Rtsp/UDPServer.h index d7c0e487..137eb576 100644 --- a/src/Rtsp/UDPServer.h +++ b/src/Rtsp/UDPServer.h @@ -30,18 +30,20 @@ public: typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData; ~UDPServer(); static UDPServer &Instance(); - Socket::Ptr getSock(const EventPoller::Ptr &poller,const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0); - void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); - void stopListenPeer(const char *strPeerIp, void *pSelf); + Socket::Ptr getSock(SocketHelper &helper, const char *local_ip, int interleaved, uint16_t local_port = 0); + void listenPeer(const char *peer_ip, void *obj, const onRecvData &cb); + void stopListenPeer(const char *peer_ip, void *obj); + private: UDPServer(); - void onRcvData(int intervaled, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); + void onRecv(int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr); void onErr(const string &strKey,const SockException &err); - unordered_map _mapUpdSock; - mutex _mtxUpdSock; - unordered_map > _mapDataHandler; - mutex _mtxDataHandler; +private: + mutex _mtx_udp_sock; + mutex _mtx_on_recv; + unordered_map _udp_sock_map; + unordered_map > _on_recv_map; }; } /* namespace mediakit */