创建Socket优先绑定本线程

This commit is contained in:
xiongziliang 2019-05-31 18:01:38 +08:00
parent f2e4eac5ef
commit c0f41167ba
7 changed files with 11 additions and 11 deletions

@ -1 +1 @@
Subproject commit aef48ed0ec6989edbc2e9667d3ac0ffa9c023fbc Subproject commit ef90b577856be9f7873adf7617a651e1c23d12f6

View File

@ -105,7 +105,7 @@ RtpBroadCaster::RtpBroadCaster(const EventPoller::Ptr &poller,const string &strL
} }
_multiAddr = MultiCastAddressMaker::Instance().obtain(); _multiAddr = MultiCastAddressMaker::Instance().obtain();
for(auto i = 0; i < 2; i++){ for(auto i = 0; i < 2; i++){
_apUdpSock[i].reset(new Socket()); _apUdpSock[i].reset(new Socket(poller));
if(!_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){ if(!_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){
auto strErr = StrPrinter << "绑定UDP端口失败:" << strLocalIp << endl; auto strErr = StrPrinter << "绑定UDP端口失败:" << strLocalIp << endl;
throw std::runtime_error(strErr); throw std::runtime_error(strErr);

View File

@ -247,12 +247,12 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) {
} }
break; break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
_apRtpSock[trackIndex].reset(new Socket()); _apRtpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apRtpSock[trackIndex].reset(); _apRtpSock[trackIndex].reset();
throw std::runtime_error("open rtp sock err"); throw std::runtime_error("open rtp sock err");
} }
_apRtcpSock[trackIndex].reset(new Socket()); _apRtcpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apRtcpSock[trackIndex]->bindUdpSock(_apRtpSock[trackIndex]->get_local_port() + 1, get_local_ip().data())) { if (!_apRtcpSock[trackIndex]->bindUdpSock(_apRtpSock[trackIndex]->get_local_port() + 1, get_local_ip().data())) {
_apRtcpSock[trackIndex].reset(); _apRtcpSock[trackIndex].reset();
throw std::runtime_error("open rtcp sock err"); throw std::runtime_error("open rtcp sock err");
@ -304,7 +304,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
if (_eType == Rtsp::RTP_MULTICAST) { if (_eType == Rtsp::RTP_MULTICAST) {
//udp组播 //udp组播
auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";"); auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
pRtpSockRef.reset(new Socket()); pRtpSockRef.reset(new Socket(getPoller()));
if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) { if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) {
pRtpSockRef.reset(); pRtpSockRef.reset();
throw std::runtime_error("open udp sock err"); throw std::runtime_error("open udp sock err");

View File

@ -252,7 +252,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) {
} }
break; break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket()); _apUdpSock[trackIndex].reset(new Socket(getPoller()));
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset(); _apUdpSock[trackIndex].reset();
throw std::runtime_error("open udp sock err"); throw std::runtime_error("open udp sock err");
@ -291,7 +291,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex)
uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data());
auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){ if(!pUdpSockRef){
pUdpSockRef.reset(new Socket()); pUdpSockRef.reset(new Socket(getPoller()));
} }
struct sockaddr_in rtpto; struct sockaddr_in rtpto;

View File

@ -679,7 +679,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
int iSrvPort = _pBrdcaster->getPort(trackRef->_type); int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包 //我们用trackIdx区分rtp和rtcp包
//由于组播udp端口是共享的而rtcp端口为组播udp端口+1所以rtcp端口需要改成共享端口 //由于组播udp端口是共享的而rtcp端口为组播udp端口+1所以rtcp端口需要改成共享端口
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) { if (!pSockRtcp) {
//分配端口失败 //分配端口失败
send_NotAcceptable(); send_NotAcceptable();

View File

@ -41,12 +41,12 @@ UDPServer::~UDPServer() {
InfoL; InfoL;
} }
Socket::Ptr UDPServer::getSock(const char* strLocalIp, int intervaled,uint16_t iLocalPort) { Socket::Ptr UDPServer::getSock(const EventPoller::Ptr &poller,const char* strLocalIp, int intervaled,uint16_t iLocalPort) {
lock_guard<mutex> lck(_mtxUpdSock); lock_guard<mutex> lck(_mtxUpdSock);
string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl; string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl;
auto it = _mapUpdSock.find(strKey); auto it = _mapUpdSock.find(strKey);
if (it == _mapUpdSock.end()) { if (it == _mapUpdSock.end()) {
Socket::Ptr pSock(new Socket()); Socket::Ptr pSock(new Socket(poller));
//InfoL<<localIp; //InfoL<<localIp;
if (!pSock->bindUdpSock(iLocalPort, strLocalIp)) { if (!pSock->bindUdpSock(iLocalPort, strLocalIp)) {
//分配失败 //分配失败

View File

@ -46,7 +46,7 @@ public:
typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData; typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData;
~UDPServer(); ~UDPServer();
static UDPServer &Instance(); static UDPServer &Instance();
Socket::Ptr getSock(const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0); Socket::Ptr getSock(const EventPoller::Ptr &poller,const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0);
void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb);
void stopListenPeer(const char *strPeerIp, void *pSelf); void stopListenPeer(const char *strPeerIp, void *pSelf);
private: private: