添加流量统计事件

This commit is contained in:
xia-chu 2021-04-07 18:35:38 +08:00
parent 0c5fa36e4d
commit a22a6bafb7
3 changed files with 38 additions and 4 deletions

View File

@ -1104,7 +1104,7 @@ void installWebApi() {
throw runtime_error(StrPrinter << "播放鉴权失败:" << err); throw runtime_error(StrPrinter << "播放鉴权失败:" << err);
} }
auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller());
rtc->attach(src, true); rtc->attach(src, info, true);
val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["sdp"] = rtc->getAnswerSdp(offer_sdp);
val["type"] = "answer"; val["type"] = "answer";
invoker(200, headerOut, val.toStyledString()); invoker(200, headerOut, val.toStyledString());
@ -1138,7 +1138,7 @@ void installWebApi() {
push_src->setProtocolTranslation(enableHls, enableMP4); push_src->setProtocolTranslation(enableHls, enableMP4);
auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller());
push_src->setListener(rtc); push_src->setListener(rtc);
rtc->attach(push_src, false); rtc->attach(push_src, info, false);
val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["sdp"] = rtc->getAnswerSdp(offer_sdp);
val["type"] = "answer"; val["type"] = "answer";
invoker(200, headerOut, val.toStyledString()); invoker(200, headerOut, val.toStyledString());

View File

@ -285,10 +285,37 @@ WebRtcTransportImp::~WebRtcTransportImp() {
void WebRtcTransportImp::onDestory() { void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory(); WebRtcTransport::onDestory();
uint64_t duration = _alive_ticker.createdTime() / 1000;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_play_src) {
WarnP(_socket) << "RTC播放器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束播放,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast<SockInfo &>(*_socket));
}
} }
void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, bool is_play) { if (_push_src) {
WarnP(_socket) << "RTC推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束推流,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast<SockInfo &>(*_socket));
}
}
}
void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) {
assert(src); assert(src);
_media_info = info;
if (is_play) { if (is_play) {
_play_src = src; _play_src = src;
} else { } else {
@ -455,6 +482,7 @@ private:
}; };
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
_bytes_usage += len;
auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len); auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len);
for (auto rtcp : rtcps) { for (auto rtcp : rtcps) {
switch ((RtcpType) rtcp->pt) { switch ((RtcpType) rtcp->pt) {
@ -504,6 +532,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
} }
void WebRtcTransportImp::onRtp(const char *buf, size_t len) { void WebRtcTransportImp::onRtp(const char *buf, size_t len) {
_bytes_usage += len;
_alive_ticker.resetTime(); _alive_ticker.resetTime();
RtpHeader *rtp = (RtpHeader *) buf; RtpHeader *rtp = (RtpHeader *) buf;
//根据接收到的rtp的pt信息找到该流的信息 //根据接收到的rtp的pt信息找到该流的信息
@ -549,6 +578,7 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){
//忽略,对方不支持该编码类型 //忽略,对方不支持该编码类型
return; return;
} }
_bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize;
sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, pt); sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, pt);
//统计rtp发送情况好做sr汇报 //统计rtp发送情况好做sr汇报
_rtp_info_pt[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); _rtp_info_pt[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);

View File

@ -127,7 +127,7 @@ public:
* @param src * @param src
* @param is_play * @param is_play
*/ */
void attach(const RtspMediaSource::Ptr &src, bool is_play = true); void attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play = true);
protected: protected:
void onStartWebRTC() override; void onStartWebRTC() override;
@ -176,6 +176,10 @@ private:
void onBeforeSortedRtp(const RtpPayloadInfo &info,const RtpPacket::Ptr &rtp); void onBeforeSortedRtp(const RtpPayloadInfo &info,const RtpPacket::Ptr &rtp);
private: private:
//用掉的总流量
uint64_t _bytes_usage = 0;
//媒体相关元数据
MediaInfo _media_info;
//保持自我强引用 //保持自我强引用
Ptr _self; Ptr _self;
//检测超时的定时器 //检测超时的定时器