mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-26 04:31:37 +08:00
rtsp拉流、rtp单端口推流新增支持获取丢包率: #1877
This commit is contained in:
parent
cd269672a5
commit
20f1275c58
@ -219,7 +219,7 @@ bool MediaSource::close(bool force) {
|
|||||||
return listener->close(*this,force);
|
return listener->close(*this,force);
|
||||||
}
|
}
|
||||||
|
|
||||||
int MediaSource::getLossRate(mediakit::TrackType type) {
|
float MediaSource::getLossRate(mediakit::TrackType type) {
|
||||||
auto listener = _listener.lock();
|
auto listener = _listener.lock();
|
||||||
if (!listener) {
|
if (!listener) {
|
||||||
return -1;
|
return -1;
|
||||||
@ -720,7 +720,7 @@ void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int MediaSourceEventInterceptor::getLossRate(MediaSource &sender, TrackType type){
|
float MediaSourceEventInterceptor::getLossRate(MediaSource &sender, TrackType type){
|
||||||
auto listener = _listener.lock();
|
auto listener = _listener.lock();
|
||||||
if (listener) {
|
if (listener) {
|
||||||
return listener->getLossRate(sender, type);
|
return listener->getLossRate(sender, type);
|
||||||
|
@ -87,7 +87,7 @@ public:
|
|||||||
//流注册或注销事件
|
//流注册或注销事件
|
||||||
virtual void onRegist(MediaSource &sender, bool regist) {};
|
virtual void onRegist(MediaSource &sender, bool regist) {};
|
||||||
// 获取丢包率
|
// 获取丢包率
|
||||||
virtual int getLossRate(MediaSource &sender, TrackType type) { return -1; }
|
virtual float getLossRate(MediaSource &sender, TrackType type) { return -1; }
|
||||||
// 获取所在线程, 此函数一般强制重载
|
// 获取所在线程, 此函数一般强制重载
|
||||||
virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller not implemented"); }
|
virtual toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) { throw NotImplemented(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller not implemented"); }
|
||||||
|
|
||||||
@ -164,7 +164,7 @@ public:
|
|||||||
std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const override;
|
std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const override;
|
||||||
void startSendRtp(MediaSource &sender, const SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) override;
|
void startSendRtp(MediaSource &sender, const SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) override;
|
||||||
bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override;
|
bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override;
|
||||||
int getLossRate(MediaSource &sender, TrackType type) override;
|
float getLossRate(MediaSource &sender, TrackType type) override;
|
||||||
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
|
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -280,7 +280,7 @@ public:
|
|||||||
// 停止发送ps-rtp
|
// 停止发送ps-rtp
|
||||||
bool stopSendRtp(const std::string &ssrc);
|
bool stopSendRtp(const std::string &ssrc);
|
||||||
// 获取丢包率
|
// 获取丢包率
|
||||||
int getLossRate(mediakit::TrackType type);
|
float getLossRate(mediakit::TrackType type);
|
||||||
// 获取所在线程
|
// 获取所在线程
|
||||||
toolkit::EventPoller::Ptr getOwnerPoller();
|
toolkit::EventPoller::Ptr getOwnerPoller();
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ public:
|
|||||||
* 获取丢包率,只支持rtsp
|
* 获取丢包率,只支持rtsp
|
||||||
* @param type 音频或视频,TrackInvalid时为总丢包率
|
* @param type 音频或视频,TrackInvalid时为总丢包率
|
||||||
*/
|
*/
|
||||||
virtual float getPacketLossRate(TrackType type) const { return 0; };
|
virtual float getPacketLossRate(TrackType type) const { return -1; };
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取所有track
|
* 获取所有track
|
||||||
|
@ -185,6 +185,10 @@ std::shared_ptr<SockInfo> PlayerProxy::getOriginSock(MediaSource &sender) const
|
|||||||
return getSockInfo();
|
return getSockInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
float PlayerProxy::getLossRate(MediaSource &sender, TrackType type) {
|
||||||
|
return getPacketLossRate(type);
|
||||||
|
}
|
||||||
|
|
||||||
void PlayerProxy::onPlaySuccess() {
|
void PlayerProxy::onPlaySuccess() {
|
||||||
GET_CONFIG(bool, reset_when_replay, General::kResetWhenRePlay);
|
GET_CONFIG(bool, reset_when_replay, General::kResetWhenRePlay);
|
||||||
if (dynamic_pointer_cast<RtspMediaSource>(_media_src)) {
|
if (dynamic_pointer_cast<RtspMediaSource>(_media_src)) {
|
||||||
|
@ -59,6 +59,7 @@ private:
|
|||||||
MediaOriginType getOriginType(MediaSource &sender) const override;
|
MediaOriginType getOriginType(MediaSource &sender) const override;
|
||||||
std::string getOriginUrl(MediaSource &sender) const override;
|
std::string getOriginUrl(MediaSource &sender) const override;
|
||||||
std::shared_ptr<toolkit::SockInfo> getOriginSock(MediaSource &sender) const override;
|
std::shared_ptr<toolkit::SockInfo> getOriginSock(MediaSource &sender) const override;
|
||||||
|
float getLossRate(MediaSource &sender, TrackType type) override;
|
||||||
|
|
||||||
void rePlay(const std::string &strUrl,int iFailedCnt);
|
void rePlay(const std::string &strUrl,int iFailedCnt);
|
||||||
void onPlaySuccess();
|
void onPlaySuccess();
|
||||||
|
@ -95,6 +95,9 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
|
|||||||
_process = std::make_shared<GB28181Process>(_media_info, this);
|
_process = std::make_shared<GB28181Process>(_media_info, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto header = (RtpHeader *) data;
|
||||||
|
onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , 90000/*ps/ts流时间戳按照90K采样率*/, len);
|
||||||
|
|
||||||
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
|
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
|
||||||
if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
|
if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
|
||||||
//无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
|
//无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
|
||||||
@ -280,20 +283,12 @@ toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) {
|
|||||||
return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller();
|
return _sock ? _sock->getPoller() : EventPollerPool::Instance().getPoller();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtpProcess::setHelper(std::weak_ptr<RtcpContext> help) {
|
float RtpProcess::getLossRate(MediaSource &sender, TrackType type) {
|
||||||
_help = std::move(help);
|
auto expected = getExpectedPacketsInterval();
|
||||||
}
|
if (!expected) {
|
||||||
|
|
||||||
int RtpProcess::getLossRate(MediaSource &sender, TrackType type) {
|
|
||||||
auto help = _help.lock();
|
|
||||||
if (!help) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
auto expected = help->getExpectedPacketsInterval();
|
return geLostInterval() * 100 / expected;
|
||||||
if (!expected) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return help->geLostInterval() * 100 / expected;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}//namespace mediakit
|
}//namespace mediakit
|
||||||
|
@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
namespace mediakit {
|
namespace mediakit {
|
||||||
|
|
||||||
class RtpProcess : public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this<RtpProcess>{
|
class RtpProcess : public RtcpContextForRecv, public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this<RtpProcess>{
|
||||||
public:
|
public:
|
||||||
typedef std::shared_ptr<RtpProcess> Ptr;
|
typedef std::shared_ptr<RtpProcess> Ptr;
|
||||||
friend class RtpProcessHelper;
|
friend class RtpProcessHelper;
|
||||||
@ -64,8 +64,6 @@ public:
|
|||||||
uint16_t get_peer_port() override;
|
uint16_t get_peer_port() override;
|
||||||
std::string getIdentifier() const override;
|
std::string getIdentifier() const override;
|
||||||
|
|
||||||
void setHelper(const std::weak_ptr<RtcpContext> help);
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool inputFrame(const Frame::Ptr &frame) override;
|
bool inputFrame(const Frame::Ptr &frame) override;
|
||||||
bool addTrack(const Track::Ptr & track) override;
|
bool addTrack(const Track::Ptr & track) override;
|
||||||
@ -77,7 +75,7 @@ protected:
|
|||||||
std::string getOriginUrl(MediaSource &sender) const override;
|
std::string getOriginUrl(MediaSource &sender) const override;
|
||||||
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
|
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
|
||||||
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
|
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
|
||||||
int getLossRate(MediaSource &sender, TrackType type) override;
|
float getLossRate(MediaSource &sender, TrackType type) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void emitOnPublish();
|
void emitOnPublish();
|
||||||
@ -100,7 +98,6 @@ private:
|
|||||||
toolkit::Ticker _last_check_alive;
|
toolkit::Ticker _last_check_alive;
|
||||||
std::recursive_mutex _func_mtx;
|
std::recursive_mutex _func_mtx;
|
||||||
std::deque<std::function<void()> > _cached_func;
|
std::deque<std::function<void()> > _cached_func;
|
||||||
std::weak_ptr<RtcpContext> _help;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}//namespace mediakit
|
}//namespace mediakit
|
||||||
|
@ -27,19 +27,18 @@ RtpServer::~RtpServer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class RtcpHelper : public RtcpContextForRecv, public std::enable_shared_from_this<RtcpHelper> {
|
class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
|
||||||
public:
|
public:
|
||||||
using Ptr = std::shared_ptr<RtcpHelper>;
|
using Ptr = std::shared_ptr<RtcpHelper>;
|
||||||
|
|
||||||
RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) {
|
RtcpHelper(Socket::Ptr rtcp_sock, RtpProcess::Ptr process) {
|
||||||
_rtcp_sock = std::move(rtcp_sock);
|
_rtcp_sock = std::move(rtcp_sock);
|
||||||
_sample_rate = sample_rate;
|
_process = std::move(process);
|
||||||
}
|
}
|
||||||
|
|
||||||
void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){
|
void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){
|
||||||
//统计rtp接受情况,用于发送rr包
|
//统计rtp接受情况,用于发送rr包
|
||||||
auto header = (RtpHeader *) buf->data();
|
auto header = (RtpHeader *) buf->data();
|
||||||
onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , _sample_rate, buf->size());
|
|
||||||
sendRtcp(ntohl(header->ssrc), addr, addr_len);
|
sendRtcp(ntohl(header->ssrc), addr, addr_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +57,7 @@ public:
|
|||||||
}
|
}
|
||||||
auto rtcps = RtcpHeader::loadFromBytes(buf->data(), buf->size());
|
auto rtcps = RtcpHeader::loadFromBytes(buf->data(), buf->size());
|
||||||
for (auto &rtcp : rtcps) {
|
for (auto &rtcp : rtcps) {
|
||||||
strong_self->onRtcp(rtcp);
|
strong_self->_process->onRtcp(rtcp);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -81,13 +80,13 @@ private:
|
|||||||
//未收到rtcp打洞包时,采用默认的rtcp端口
|
//未收到rtcp打洞包时,采用默认的rtcp端口
|
||||||
rtcp_addr = addr;
|
rtcp_addr = addr;
|
||||||
}
|
}
|
||||||
_rtcp_sock->send(createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len);
|
_rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Ticker _ticker;
|
Ticker _ticker;
|
||||||
Socket::Ptr _rtcp_sock;
|
Socket::Ptr _rtcp_sock;
|
||||||
uint32_t _sample_rate;
|
RtpProcess::Ptr _process;
|
||||||
std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
|
std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -126,8 +125,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
|
|||||||
if (!stream_id.empty()) {
|
if (!stream_id.empty()) {
|
||||||
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
|
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
|
||||||
process = RtpSelector::Instance().getProcess(stream_id, true);
|
process = RtpSelector::Instance().getProcess(stream_id, true);
|
||||||
RtcpHelper::Ptr helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), 90000);
|
RtcpHelper::Ptr helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), process);
|
||||||
process->setHelper(helper);
|
|
||||||
helper->startRtcp();
|
helper->startRtcp();
|
||||||
rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
|
rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
|
||||||
RtpHeader *header = (RtpHeader *)buf->data();
|
RtpHeader *header = (RtpHeader *)buf->data();
|
||||||
|
@ -148,6 +148,6 @@ void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const {
|
|||||||
configure.audio.direction = configure.video.direction = RtpDirection::recvonly;
|
configure.audio.direction = configure.video.direction = RtpDirection::recvonly;
|
||||||
}
|
}
|
||||||
|
|
||||||
int WebRtcPusher::getLossRate(MediaSource &sender,mediakit::TrackType type){
|
float WebRtcPusher::getLossRate(MediaSource &sender,mediakit::TrackType type){
|
||||||
return WebRtcTransportImp::getLossRate(type);
|
return WebRtcTransportImp::getLossRate(type);
|
||||||
}
|
}
|
@ -40,7 +40,7 @@ protected:
|
|||||||
// 获取媒体源客户端相关信息
|
// 获取媒体源客户端相关信息
|
||||||
std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
|
std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
|
||||||
// 获取丢包率
|
// 获取丢包率
|
||||||
int getLossRate(mediakit::MediaSource &sender,mediakit::TrackType type) override;
|
float getLossRate(mediakit::MediaSource &sender,mediakit::TrackType type) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src,
|
WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src,
|
||||||
|
@ -650,10 +650,10 @@ public:
|
|||||||
return _rtcp_context.createRtcpRR(ssrc, getSSRC());
|
return _rtcp_context.createRtcpRR(ssrc, getSSRC());
|
||||||
}
|
}
|
||||||
|
|
||||||
int getLossRate() {
|
float getLossRate() {
|
||||||
auto expected = _rtcp_context.getExpectedPacketsInterval();
|
auto expected = _rtcp_context.getExpectedPacketsInterval();
|
||||||
if (!expected) {
|
if (!expected) {
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
return _rtcp_context.geLostInterval() * 100 / expected;
|
return _rtcp_context.geLostInterval() * 100 / expected;
|
||||||
}
|
}
|
||||||
@ -698,7 +698,7 @@ std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const {
|
|||||||
return it_chn->second;
|
return it_chn->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
int WebRtcTransportImp::getLossRate(mediakit::TrackType type) {
|
float WebRtcTransportImp::getLossRate(mediakit::TrackType type) {
|
||||||
for (auto &pr : _ssrc_to_track) {
|
for (auto &pr : _ssrc_to_track) {
|
||||||
auto ssrc = pr.first;
|
auto ssrc = pr.first;
|
||||||
auto &track = pr.second;
|
auto &track = pr.second;
|
||||||
|
@ -263,7 +263,7 @@ protected:
|
|||||||
void onShutdown(const SockException &ex) override;
|
void onShutdown(const SockException &ex) override;
|
||||||
virtual void onRecvRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp) = 0;
|
virtual void onRecvRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp) = 0;
|
||||||
void updateTicker();
|
void updateTicker();
|
||||||
int getLossRate(mediakit::TrackType type);
|
float getLossRate(mediakit::TrackType type);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void onSortedRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp);
|
void onSortedRtp(MediaTrack &track, const std::string &rid, mediakit::RtpPacket::Ptr rtp);
|
||||||
|
Loading…
Reference in New Issue
Block a user