diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 28c3d20c..2bb688aa 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 28c3d20c3a94b6e72bcd2d4e7057d28b8215594a +Subproject commit 2bb688aa9b29b79f262e3f90613c177ebac109ac diff --git a/README.md b/README.md index 4e79e725..0c46d8c5 100644 --- a/README.md +++ b/README.md @@ -164,8 +164,9 @@ bash build_docker_images.sh - [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk) - 播放器 + - [基于wasm支持H265的播放器](https://github.com/numberwolf/h265web.js) - [基于MSE的websocket-fmp4播放器](https://github.com/v354412101/wsPlayer) - + ## 授权协议 本项目自有代码使用宽松的MIT协议,在保留版权信息的情况下可以自由应用于各自商用、非商业的项目。 diff --git a/server/main.cpp b/server/main.cpp index bb972bfb..0842f321 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -194,8 +194,6 @@ int start_main(int argc,char *argv[]) { string ssl_file = cmd_main["ssl"]; int threads = cmd_main["threads"]; - setThreadName("main thread"); - //设置日志 Logger::Instance().add(std::make_shared("ConsoleChannel", logLevel)); #ifndef ANDROID @@ -248,28 +246,28 @@ int start_main(int argc,char *argv[]) { //简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象 //测试方法:telnet 127.0.0.1 9000 - TcpServer::Ptr shellSrv = std::make_shared(); + auto shellSrv = std::make_shared(); //rtsp[s]服务器, 可用于诸如亚马逊echo show这样的设备访问 - TcpServer::Ptr rtspSrv = std::make_shared();; - TcpServer::Ptr rtspSSLSrv = std::make_shared();; + auto rtspSrv = std::make_shared();; + auto rtspSSLSrv = std::make_shared();; //rtmp[s]服务器 - TcpServer::Ptr rtmpSrv = std::make_shared();; - TcpServer::Ptr rtmpsSrv = std::make_shared();; + auto rtmpSrv = std::make_shared();; + auto rtmpsSrv = std::make_shared();; //http[s]服务器 - TcpServer::Ptr httpSrv = std::make_shared();; - TcpServer::Ptr httpsSrv = std::make_shared();; + auto httpSrv = std::make_shared();; + auto httpsSrv = std::make_shared();; #if defined(ENABLE_RTPPROXY) //GB28181 rtp推流端口,支持UDP/TCP - RtpServer::Ptr rtpServer = std::make_shared(); + auto rtpServer = std::make_shared(); #endif//defined(ENABLE_RTPPROXY) #if defined(ENABLE_WEBRTC) //webrtc udp服务器 - UdpServer::Ptr rtcSrv = std::make_shared(); + auto rtcSrv = std::make_shared(); rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { if (!buf) { return Socket::createSocket(poller, false); diff --git a/src/Extension/H264Rtp.cpp b/src/Extension/H264Rtp.cpp index 143d3a94..e28cfa3d 100644 --- a/src/Extension/H264Rtp.cpp +++ b/src/Extension/H264Rtp.cpp @@ -268,10 +268,6 @@ void H264RtpEncoder::packRtpStapA(const char *ptr, size_t len, uint32_t pts, boo bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { auto ptr = frame->data() + frame->prefixSize(); switch (H264_TYPE(ptr[0])) { - case H264Frame::NAL_AUD: - case H264Frame::NAL_SEI : { - return false; - } case H264Frame::NAL_SPS: { _sps = Frame::getCacheAbleFrame(frame); return true; diff --git a/webrtc/Sdp.cpp b/webrtc/Sdp.cpp index 0831a1cf..82f28acf 100644 --- a/webrtc/Sdp.cpp +++ b/webrtc/Sdp.cpp @@ -1737,16 +1737,21 @@ RETRY: void RtcConfigure::setPlayRtspInfo(const string &sdp){ RtcSession session; + video.direction = RtpDirection::inactive; + audio.direction = RtpDirection::inactive; + session.loadFrom(sdp); for (auto &m : session.media) { switch (m.type) { case TrackVideo : { + video.direction = RtpDirection::sendonly; _rtsp_video_plan = std::make_shared(m.plan[0]); video.preferred_codec.clear(); video.preferred_codec.emplace_back(getCodecId(_rtsp_video_plan->codec)); break; } case TrackAudio : { + audio.direction = RtpDirection::sendonly; _rtsp_audio_plan = std::make_shared(m.plan[0]); audio.preferred_codec.clear(); audio.preferred_codec.emplace_back(getCodecId(_rtsp_audio_plan->codec)); diff --git a/webrtc/TwccContext.cpp b/webrtc/TwccContext.cpp index 6a3bfbd6..ed565186 100644 --- a/webrtc/TwccContext.cpp +++ b/webrtc/TwccContext.cpp @@ -21,7 +21,7 @@ enum class ExtSeqStatus : int { void TwccContext::onRtp(uint32_t ssrc, uint16_t twcc_ext_seq, uint64_t stamp_ms) { switch ((ExtSeqStatus) checkSeqStatus(twcc_ext_seq)) { - case ExtSeqStatus::jumped: /*回环后,收到回环前的大ext seq包,过滤掉*/ return; + case ExtSeqStatus::jumped: /*seq异常,过滤掉*/ return; case ExtSeqStatus::looped: /*回环,触发发送twcc rtcp*/ onSendTwcc(ssrc); break; case ExtSeqStatus::normal: break; default: /*不可达*/assert(0); break; @@ -56,16 +56,29 @@ int TwccContext::checkSeqStatus(uint16_t twcc_ext_seq) const { return (int) ExtSeqStatus::normal; } auto max = _rtp_recv_status.rbegin()->first; - if (max > 0xFF00 && twcc_ext_seq < 0xFF) { - //发生回环了 + auto delta = (int32_t) twcc_ext_seq - (int32_t) max; + if (delta > 0 && delta < 0xFFFF / 2) { + //正常增长 + return (int) ExtSeqStatus::normal; + } + if (delta < -0xFF00) { + //回环 TraceL << "rtp twcc ext seq looped:" << max << " -> " << twcc_ext_seq; return (int) ExtSeqStatus::looped; } - if (twcc_ext_seq - max > 0xFFFF / 2) { - TraceL << "rtp twcc ext seq jumped:" << max << " -> " << twcc_ext_seq; + if (delta > 0xFF00) { + //回环后收到前面大的乱序的包,无法处理,丢弃 + TraceL << "rtp twcc ext seq jumped after looped:" << max << " -> " << twcc_ext_seq; return (int) ExtSeqStatus::jumped; } - return (int) ExtSeqStatus::normal; + auto min = _rtp_recv_status.begin()->first; + if (min <= twcc_ext_seq || twcc_ext_seq <= max) { + //正常回退 + return (int) ExtSeqStatus::normal; + } + //seq莫名的大幅增加或减少,无法处理,丢弃 + TraceL << "rtp twcc ext seq jumped:" << max << " -> " << twcc_ext_seq; + return (int) ExtSeqStatus::jumped; } void TwccContext::onSendTwcc(uint32_t ssrc) { diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index 1443269b..6a83ac06 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -55,7 +55,7 @@ void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { //只允许寻找一次transport _find_transport = false; auto user_name = getUserName(buffer); - _identifier = user_name + '-' + to_string(reinterpret_cast(this)); + _identifier = to_string(getSock()->rawFD()) + '-' + user_name; auto transport = WebRtcTransportManager::Instance().getItem(user_name); CHECK(transport && transport->getPoller()->isCurrentThread()); transport->setSession(shared_from_this()); diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 2f396374..317140b8 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -43,9 +43,12 @@ static onceToken token([]() { }//namespace RTC +static atomic s_key{0}; + WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { _poller = poller; - _identifier = to_string(reinterpret_cast(this)); + _identifier = "zlm_"+to_string(++s_key); + _packet_pool.setSize(64); } void WebRtcTransport::onCreate(){ @@ -69,7 +72,7 @@ const string &WebRtcTransport::getIdentifier() const { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) { - onSendSockData((char *) packet->GetData(), packet->GetSize(), (struct sockaddr_in *) tuple); + sendSockData((char *) packet->GetData(), packet->GetSize(), tuple); } void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) { @@ -110,7 +113,7 @@ void WebRtcTransport::OnDtlsTransportConnected( } void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) { - onSendSockData((char *)data, len); + sendSockData((char *)data, len, nullptr); } void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) { @@ -132,10 +135,10 @@ void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTran } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void WebRtcTransport::onSendSockData(const char *buf, size_t len, bool flush){ - auto tuple = _ice_server->GetSelectedTuple(); - assert(tuple); - onSendSockData(buf, len, (struct sockaddr_in *) tuple, flush); +void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple){ + auto pkt = _packet_pool.obtain(); + pkt->assign(buf, len); + onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple()); } RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{ @@ -266,23 +269,28 @@ void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tup void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx) { if (_srtp_session_send) { + auto pkt = _packet_pool.obtain(); //预留rtx加入的两个字节 - CHECK((size_t)len + SRTP_MAX_TRAILER_LEN + 2 <= sizeof(_srtp_buf)); - memcpy(_srtp_buf, buf, len); - onBeforeEncryptRtp((char *) _srtp_buf, len, ctx); - if (_srtp_session_send->EncryptRtp(_srtp_buf, &len)) { - onSendSockData((char *) _srtp_buf, len, flush); + pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2); + pkt->assign(buf, len); + onBeforeEncryptRtp(pkt->data(), len, ctx); + if (_srtp_session_send->EncryptRtp(reinterpret_cast(pkt->data()), &len)) { + pkt->setSize(len); + onSendSockData(std::move(pkt), flush); } } } -void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx){ +void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx) { if (_srtp_session_send) { - CHECK((size_t)len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf)); - memcpy(_srtp_buf, buf, len); - onBeforeEncryptRtcp((char *) _srtp_buf, len, ctx); - if (_srtp_session_send->EncryptRtcp(_srtp_buf, &len)) { - onSendSockData((char *) _srtp_buf, len, flush); + auto pkt = _packet_pool.obtain(); + //预留rtx加入的两个字节 + pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2); + pkt->assign(buf, len); + onBeforeEncryptRtcp(pkt->data(), len, ctx); + if (_srtp_session_send->EncryptRtcp(reinterpret_cast(pkt->data()), &len)) { + pkt->setSize(len); + onSendSockData(std::move(pkt), flush); } } } @@ -313,7 +321,6 @@ void WebRtcTransportImp::onCreate(){ WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) { InfoL << getIdentifier(); - _packet_pool.setSize(64); } WebRtcTransportImp::~WebRtcTransportImp() { @@ -325,16 +332,14 @@ void WebRtcTransportImp::onDestory() { unregisterSelf(); } -void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { +void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple) { if (!_session) { - WarnL << "send data failed:" << len; + WarnL << "send data failed:" << buf->size(); return; } - auto ptr = _packet_pool.obtain(); - ptr->assign(buf, len); //一次性发送一帧的rtp数据,提高网络io性能 _session->setSendFlushFlag(flush); - _session->send(std::move(ptr)); + _session->send(std::move(buf)); } /////////////////////////////////////////////////////////////////// @@ -807,7 +812,8 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r } #endif } else { - WarnL << "send rtx rtp:" << rtp->getSeq(); + //发送rtx重传包 + TraceL << "send rtx rtp:" << rtp->getSeq(); } pair ctx{rtx, track.get()}; sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx); diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 2879944a..d557b710 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -135,7 +135,7 @@ protected: virtual void onStartWebRTC() = 0; virtual void onRtcConfigure(RtcConfigure &configure) const; virtual void onCheckSdp(SdpType type, RtcSession &sdp) = 0; - virtual void onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush = true) = 0; + virtual void onSendSockData(Buffer::Ptr buf, bool flush = true, RTC::TransportTuple *tuple = nullptr) = 0; virtual void onRtp(const char *buf, size_t len, uint64_t stamp_ms) = 0; virtual void onRtcp(const char *buf, size_t len) = 0; @@ -149,7 +149,7 @@ protected: void sendRtcpPli(uint32_t ssrc); private: - void onSendSockData(const char *buf, size_t len, bool flush = true); + void sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple); void setRemoteDtlsFingerprint(const RtcSession &remote); protected: @@ -157,7 +157,6 @@ protected: RtcSession::Ptr _answer_sdp; private: - uint8_t _srtp_buf[2000]; string _identifier; EventPoller::Ptr _poller; std::shared_ptr _ice_server; @@ -165,6 +164,8 @@ private: std::shared_ptr _srtp_session_send; std::shared_ptr _srtp_session_recv; Ticker _ticker; + //循环池 + ResourcePool _packet_pool; }; class RtpChannel; @@ -232,7 +233,7 @@ public: protected: WebRtcTransportImp(const EventPoller::Ptr &poller); void onStartWebRTC() override; - void onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush = true) override; + void onSendSockData(Buffer::Ptr buf, bool flush = true, RTC::TransportTuple *tuple = nullptr) override; void onCheckSdp(SdpType type, RtcSession &sdp) override; void onRtcConfigure(RtcConfigure &configure) const override; @@ -279,8 +280,6 @@ private: unordered_map _ssrc_to_track; //根据接收rtp的pt获取相关信息 unordered_map> _pt_to_track; - //循环池 - ResourcePool _packet_pool; }; class WebRtcTransportManager {