Merge branch 'master' of github.com:ZLMediaKit/ZLMediaKit

This commit is contained in:
ziyue 2021-11-30 18:05:06 +08:00
commit 78bcd9e868
9 changed files with 73 additions and 55 deletions

@ -1 +1 @@
Subproject commit 28c3d20c3a94b6e72bcd2d4e7057d28b8215594a Subproject commit 2bb688aa9b29b79f262e3f90613c177ebac109ac

View File

@ -164,6 +164,7 @@ bash build_docker_images.sh
- [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk) - [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk)
- 播放器 - 播放器
- [基于wasm支持H265的播放器](https://github.com/numberwolf/h265web.js)
- [基于MSE的websocket-fmp4播放器](https://github.com/v354412101/wsPlayer) - [基于MSE的websocket-fmp4播放器](https://github.com/v354412101/wsPlayer)
## 授权协议 ## 授权协议

View File

@ -194,8 +194,6 @@ int start_main(int argc,char *argv[]) {
string ssl_file = cmd_main["ssl"]; string ssl_file = cmd_main["ssl"];
int threads = cmd_main["threads"]; int threads = cmd_main["threads"];
setThreadName("main thread");
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("ConsoleChannel", logLevel)); Logger::Instance().add(std::make_shared<ConsoleChannel>("ConsoleChannel", logLevel));
#ifndef ANDROID #ifndef ANDROID
@ -248,28 +246,28 @@ int start_main(int argc,char *argv[]) {
//简单的telnet服务器可用于服务器调试但是不能使用23端口否则telnet上了莫名其妙的现象 //简单的telnet服务器可用于服务器调试但是不能使用23端口否则telnet上了莫名其妙的现象
//测试方法:telnet 127.0.0.1 9000 //测试方法:telnet 127.0.0.1 9000
TcpServer::Ptr shellSrv = std::make_shared<TcpServer>(); auto shellSrv = std::make_shared<TcpServer>();
//rtsp[s]服务器, 可用于诸如亚马逊echo show这样的设备访问 //rtsp[s]服务器, 可用于诸如亚马逊echo show这样的设备访问
TcpServer::Ptr rtspSrv = std::make_shared<TcpServer>();; auto rtspSrv = std::make_shared<TcpServer>();;
TcpServer::Ptr rtspSSLSrv = std::make_shared<TcpServer>();; auto rtspSSLSrv = std::make_shared<TcpServer>();;
//rtmp[s]服务器 //rtmp[s]服务器
TcpServer::Ptr rtmpSrv = std::make_shared<TcpServer>();; auto rtmpSrv = std::make_shared<TcpServer>();;
TcpServer::Ptr rtmpsSrv = std::make_shared<TcpServer>();; auto rtmpsSrv = std::make_shared<TcpServer>();;
//http[s]服务器 //http[s]服务器
TcpServer::Ptr httpSrv = std::make_shared<TcpServer>();; auto httpSrv = std::make_shared<TcpServer>();;
TcpServer::Ptr httpsSrv = std::make_shared<TcpServer>();; auto httpsSrv = std::make_shared<TcpServer>();;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
//GB28181 rtp推流端口支持UDP/TCP //GB28181 rtp推流端口支持UDP/TCP
RtpServer::Ptr rtpServer = std::make_shared<RtpServer>(); auto rtpServer = std::make_shared<RtpServer>();
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)
#if defined(ENABLE_WEBRTC) #if defined(ENABLE_WEBRTC)
//webrtc udp服务器 //webrtc udp服务器
UdpServer::Ptr rtcSrv = std::make_shared<UdpServer>(); auto rtcSrv = std::make_shared<UdpServer>();
rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) { rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
if (!buf) { if (!buf) {
return Socket::createSocket(poller, false); return Socket::createSocket(poller, false);

View File

@ -268,10 +268,6 @@ void H264RtpEncoder::packRtpStapA(const char *ptr, size_t len, uint32_t pts, boo
bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
auto ptr = frame->data() + frame->prefixSize(); auto ptr = frame->data() + frame->prefixSize();
switch (H264_TYPE(ptr[0])) { switch (H264_TYPE(ptr[0])) {
case H264Frame::NAL_AUD:
case H264Frame::NAL_SEI : {
return false;
}
case H264Frame::NAL_SPS: { case H264Frame::NAL_SPS: {
_sps = Frame::getCacheAbleFrame(frame); _sps = Frame::getCacheAbleFrame(frame);
return true; return true;

View File

@ -1737,16 +1737,21 @@ RETRY:
void RtcConfigure::setPlayRtspInfo(const string &sdp){ void RtcConfigure::setPlayRtspInfo(const string &sdp){
RtcSession session; RtcSession session;
video.direction = RtpDirection::inactive;
audio.direction = RtpDirection::inactive;
session.loadFrom(sdp); session.loadFrom(sdp);
for (auto &m : session.media) { for (auto &m : session.media) {
switch (m.type) { switch (m.type) {
case TrackVideo : { case TrackVideo : {
video.direction = RtpDirection::sendonly;
_rtsp_video_plan = std::make_shared<RtcCodecPlan>(m.plan[0]); _rtsp_video_plan = std::make_shared<RtcCodecPlan>(m.plan[0]);
video.preferred_codec.clear(); video.preferred_codec.clear();
video.preferred_codec.emplace_back(getCodecId(_rtsp_video_plan->codec)); video.preferred_codec.emplace_back(getCodecId(_rtsp_video_plan->codec));
break; break;
} }
case TrackAudio : { case TrackAudio : {
audio.direction = RtpDirection::sendonly;
_rtsp_audio_plan = std::make_shared<RtcCodecPlan>(m.plan[0]); _rtsp_audio_plan = std::make_shared<RtcCodecPlan>(m.plan[0]);
audio.preferred_codec.clear(); audio.preferred_codec.clear();
audio.preferred_codec.emplace_back(getCodecId(_rtsp_audio_plan->codec)); audio.preferred_codec.emplace_back(getCodecId(_rtsp_audio_plan->codec));

View File

@ -21,7 +21,7 @@ enum class ExtSeqStatus : int {
void TwccContext::onRtp(uint32_t ssrc, uint16_t twcc_ext_seq, uint64_t stamp_ms) { void TwccContext::onRtp(uint32_t ssrc, uint16_t twcc_ext_seq, uint64_t stamp_ms) {
switch ((ExtSeqStatus) checkSeqStatus(twcc_ext_seq)) { switch ((ExtSeqStatus) checkSeqStatus(twcc_ext_seq)) {
case ExtSeqStatus::jumped: /*回环后收到回环前的大ext seq包,过滤掉*/ return; case ExtSeqStatus::jumped: /*seq异常,过滤掉*/ return;
case ExtSeqStatus::looped: /*回环触发发送twcc rtcp*/ onSendTwcc(ssrc); break; case ExtSeqStatus::looped: /*回环触发发送twcc rtcp*/ onSendTwcc(ssrc); break;
case ExtSeqStatus::normal: break; case ExtSeqStatus::normal: break;
default: /*不可达*/assert(0); break; default: /*不可达*/assert(0); break;
@ -56,17 +56,30 @@ int TwccContext::checkSeqStatus(uint16_t twcc_ext_seq) const {
return (int) ExtSeqStatus::normal; return (int) ExtSeqStatus::normal;
} }
auto max = _rtp_recv_status.rbegin()->first; auto max = _rtp_recv_status.rbegin()->first;
if (max > 0xFF00 && twcc_ext_seq < 0xFF) { auto delta = (int32_t) twcc_ext_seq - (int32_t) max;
//发生回环了 if (delta > 0 && delta < 0xFFFF / 2) {
//正常增长
return (int) ExtSeqStatus::normal;
}
if (delta < -0xFF00) {
//回环
TraceL << "rtp twcc ext seq looped:" << max << " -> " << twcc_ext_seq; TraceL << "rtp twcc ext seq looped:" << max << " -> " << twcc_ext_seq;
return (int) ExtSeqStatus::looped; return (int) ExtSeqStatus::looped;
} }
if (twcc_ext_seq - max > 0xFFFF / 2) { if (delta > 0xFF00) {
TraceL << "rtp twcc ext seq jumped:" << max << " -> " << twcc_ext_seq; //回环后收到前面大的乱序的包,无法处理,丢弃
TraceL << "rtp twcc ext seq jumped after looped:" << max << " -> " << twcc_ext_seq;
return (int) ExtSeqStatus::jumped; return (int) ExtSeqStatus::jumped;
} }
auto min = _rtp_recv_status.begin()->first;
if (min <= twcc_ext_seq || twcc_ext_seq <= max) {
//正常回退
return (int) ExtSeqStatus::normal; return (int) ExtSeqStatus::normal;
} }
//seq莫名的大幅增加或减少无法处理丢弃
TraceL << "rtp twcc ext seq jumped:" << max << " -> " << twcc_ext_seq;
return (int) ExtSeqStatus::jumped;
}
void TwccContext::onSendTwcc(uint32_t ssrc) { void TwccContext::onSendTwcc(uint32_t ssrc) {
auto max = _rtp_recv_status.rbegin()->first; auto max = _rtp_recv_status.rbegin()->first;

View File

@ -55,7 +55,7 @@ void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
//只允许寻找一次transport //只允许寻找一次transport
_find_transport = false; _find_transport = false;
auto user_name = getUserName(buffer); auto user_name = getUserName(buffer);
_identifier = user_name + '-' + to_string(reinterpret_cast<uint64_t>(this)); _identifier = to_string(getSock()->rawFD()) + '-' + user_name;
auto transport = WebRtcTransportManager::Instance().getItem(user_name); auto transport = WebRtcTransportManager::Instance().getItem(user_name);
CHECK(transport && transport->getPoller()->isCurrentThread()); CHECK(transport && transport->getPoller()->isCurrentThread());
transport->setSession(shared_from_this()); transport->setSession(shared_from_this());

View File

@ -43,9 +43,12 @@ static onceToken token([]() {
}//namespace RTC }//namespace RTC
static atomic<uint64_t> s_key{0};
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) { WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
_poller = poller; _poller = poller;
_identifier = to_string(reinterpret_cast<uint64_t>(this)); _identifier = "zlm_"+to_string(++s_key);
_packet_pool.setSize(64);
} }
void WebRtcTransport::onCreate(){ void WebRtcTransport::onCreate(){
@ -69,7 +72,7 @@ const string &WebRtcTransport::getIdentifier() const {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) { void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
onSendSockData((char *) packet->GetData(), packet->GetSize(), (struct sockaddr_in *) tuple); sendSockData((char *) packet->GetData(), packet->GetSize(), tuple);
} }
void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) { void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
@ -110,7 +113,7 @@ void WebRtcTransport::OnDtlsTransportConnected(
} }
void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) { void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
onSendSockData((char *)data, len); sendSockData((char *)data, len, nullptr);
} }
void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) { void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) {
@ -132,10 +135,10 @@ void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTran
} }
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::onSendSockData(const char *buf, size_t len, bool flush){ void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple){
auto tuple = _ice_server->GetSelectedTuple(); auto pkt = _packet_pool.obtain();
assert(tuple); pkt->assign(buf, len);
onSendSockData(buf, len, (struct sockaddr_in *) tuple, flush); onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple());
} }
RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{ RTC::TransportTuple* WebRtcTransport::getSelectedTuple() const{
@ -266,23 +269,28 @@ void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tup
void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx) { void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx) {
if (_srtp_session_send) { if (_srtp_session_send) {
auto pkt = _packet_pool.obtain();
//预留rtx加入的两个字节 //预留rtx加入的两个字节
CHECK((size_t)len + SRTP_MAX_TRAILER_LEN + 2 <= sizeof(_srtp_buf)); pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2);
memcpy(_srtp_buf, buf, len); pkt->assign(buf, len);
onBeforeEncryptRtp((char *) _srtp_buf, len, ctx); onBeforeEncryptRtp(pkt->data(), len, ctx);
if (_srtp_session_send->EncryptRtp(_srtp_buf, &len)) { if (_srtp_session_send->EncryptRtp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
onSendSockData((char *) _srtp_buf, len, flush); pkt->setSize(len);
onSendSockData(std::move(pkt), flush);
} }
} }
} }
void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx) { void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx) {
if (_srtp_session_send) { if (_srtp_session_send) {
CHECK((size_t)len + SRTP_MAX_TRAILER_LEN <= sizeof(_srtp_buf)); auto pkt = _packet_pool.obtain();
memcpy(_srtp_buf, buf, len); //预留rtx加入的两个字节
onBeforeEncryptRtcp((char *) _srtp_buf, len, ctx); pkt->setCapacity((size_t) len + SRTP_MAX_TRAILER_LEN + 2);
if (_srtp_session_send->EncryptRtcp(_srtp_buf, &len)) { pkt->assign(buf, len);
onSendSockData((char *) _srtp_buf, len, flush); onBeforeEncryptRtcp(pkt->data(), len, ctx);
if (_srtp_session_send->EncryptRtcp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
pkt->setSize(len);
onSendSockData(std::move(pkt), flush);
} }
} }
} }
@ -313,7 +321,6 @@ void WebRtcTransportImp::onCreate(){
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) { WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) {
InfoL << getIdentifier(); InfoL << getIdentifier();
_packet_pool.setSize(64);
} }
WebRtcTransportImp::~WebRtcTransportImp() { WebRtcTransportImp::~WebRtcTransportImp() {
@ -325,16 +332,14 @@ void WebRtcTransportImp::onDestory() {
unregisterSelf(); unregisterSelf();
} }
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) { void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple) {
if (!_session) { if (!_session) {
WarnL << "send data failed:" << len; WarnL << "send data failed:" << buf->size();
return; return;
} }
auto ptr = _packet_pool.obtain();
ptr->assign(buf, len);
//一次性发送一帧的rtp数据提高网络io性能 //一次性发送一帧的rtp数据提高网络io性能
_session->setSendFlushFlag(flush); _session->setSendFlushFlag(flush);
_session->send(std::move(ptr)); _session->send(std::move(buf));
} }
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
@ -807,7 +812,8 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r
} }
#endif #endif
} else { } else {
WarnL << "send rtx rtp:" << rtp->getSeq(); //发送rtx重传包
TraceL << "send rtx rtp:" << rtp->getSeq();
} }
pair<bool/*rtx*/, MediaTrack *> ctx{rtx, track.get()}; pair<bool/*rtx*/, MediaTrack *> ctx{rtx, track.get()};
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx); sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx);

View File

@ -135,7 +135,7 @@ protected:
virtual void onStartWebRTC() = 0; virtual void onStartWebRTC() = 0;
virtual void onRtcConfigure(RtcConfigure &configure) const; virtual void onRtcConfigure(RtcConfigure &configure) const;
virtual void onCheckSdp(SdpType type, RtcSession &sdp) = 0; virtual void onCheckSdp(SdpType type, RtcSession &sdp) = 0;
virtual void onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush = true) = 0; virtual void onSendSockData(Buffer::Ptr buf, bool flush = true, RTC::TransportTuple *tuple = nullptr) = 0;
virtual void onRtp(const char *buf, size_t len, uint64_t stamp_ms) = 0; virtual void onRtp(const char *buf, size_t len, uint64_t stamp_ms) = 0;
virtual void onRtcp(const char *buf, size_t len) = 0; virtual void onRtcp(const char *buf, size_t len) = 0;
@ -149,7 +149,7 @@ protected:
void sendRtcpPli(uint32_t ssrc); void sendRtcpPli(uint32_t ssrc);
private: private:
void onSendSockData(const char *buf, size_t len, bool flush = true); void sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple);
void setRemoteDtlsFingerprint(const RtcSession &remote); void setRemoteDtlsFingerprint(const RtcSession &remote);
protected: protected:
@ -157,7 +157,6 @@ protected:
RtcSession::Ptr _answer_sdp; RtcSession::Ptr _answer_sdp;
private: private:
uint8_t _srtp_buf[2000];
string _identifier; string _identifier;
EventPoller::Ptr _poller; EventPoller::Ptr _poller;
std::shared_ptr<RTC::IceServer> _ice_server; std::shared_ptr<RTC::IceServer> _ice_server;
@ -165,6 +164,8 @@ private:
std::shared_ptr<RTC::SrtpSession> _srtp_session_send; std::shared_ptr<RTC::SrtpSession> _srtp_session_send;
std::shared_ptr<RTC::SrtpSession> _srtp_session_recv; std::shared_ptr<RTC::SrtpSession> _srtp_session_recv;
Ticker _ticker; Ticker _ticker;
//循环池
ResourcePool<BufferRaw> _packet_pool;
}; };
class RtpChannel; class RtpChannel;
@ -232,7 +233,7 @@ public:
protected: protected:
WebRtcTransportImp(const EventPoller::Ptr &poller); WebRtcTransportImp(const EventPoller::Ptr &poller);
void onStartWebRTC() override; void onStartWebRTC() override;
void onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush = true) override; void onSendSockData(Buffer::Ptr buf, bool flush = true, RTC::TransportTuple *tuple = nullptr) override;
void onCheckSdp(SdpType type, RtcSession &sdp) override; void onCheckSdp(SdpType type, RtcSession &sdp) override;
void onRtcConfigure(RtcConfigure &configure) const override; void onRtcConfigure(RtcConfigure &configure) const override;
@ -279,8 +280,6 @@ private:
unordered_map<uint32_t/*ssrc*/, MediaTrack::Ptr> _ssrc_to_track; unordered_map<uint32_t/*ssrc*/, MediaTrack::Ptr> _ssrc_to_track;
//根据接收rtp的pt获取相关信息 //根据接收rtp的pt获取相关信息
unordered_map<uint8_t/*pt*/, std::unique_ptr<WrappedMediaTrack>> _pt_to_track; unordered_map<uint8_t/*pt*/, std::unique_ptr<WrappedMediaTrack>> _pt_to_track;
//循环池
ResourcePool<BufferRaw> _packet_pool;
}; };
class WebRtcTransportManager { class WebRtcTransportManager {