ZLMediaKit/src/Rtp/RtpProcess.cpp

277 lines
8.6 KiB
C++
Raw Normal View History

2019-12-05 19:20:12 +08:00
/*
2020-04-04 20:30:09 +08:00
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
2019-12-06 11:54:10 +08:00
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
2019-12-06 11:54:10 +08:00
*
2020-04-04 20:30:09 +08:00
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
2019-12-06 11:54:10 +08:00
*/
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
#if defined(ENABLE_RTPPROXY)
2020-10-24 23:33:13 +08:00
#include "GB28181Process.h"
2019-12-05 19:20:12 +08:00
#include "RtpProcess.h"
2020-05-17 18:00:23 +08:00
#include "Http/HttpTSPlayer.h"
2020-10-24 23:33:13 +08:00
using namespace std;
using namespace toolkit;
2021-06-23 11:09:53 +08:00
static constexpr char kRtpAppName[] = "rtp";
//在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验
//但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回
static constexpr size_t kMaxCachedFrame = 200;
2019-12-05 19:20:12 +08:00
2020-10-24 23:33:13 +08:00
namespace mediakit {
2019-12-05 19:20:12 +08:00
RtpProcess::RtpProcess(const string &stream_id) {
2021-06-23 11:09:53 +08:00
_media_info._schema = kRtpAppName;
2020-04-23 23:18:24 +08:00
_media_info._vhost = DEFAULT_VHOST;
2021-06-23 11:09:53 +08:00
_media_info._app = kRtpAppName;
_media_info._streamid = stream_id;
2019-12-05 19:20:12 +08:00
2020-10-24 23:33:13 +08:00
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
2019-12-05 19:20:12 +08:00
{
2020-04-24 12:39:22 +08:00
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
2020-10-24 23:33:13 +08:00
if (fp) {
_save_file_rtp.reset(fp, [](FILE *fp) {
2019-12-05 19:20:12 +08:00
fclose(fp);
});
}
}
{
2020-04-24 12:39:22 +08:00
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
2020-10-24 23:33:13 +08:00
if (fp) {
_save_file_video.reset(fp, [](FILE *fp) {
2019-12-05 19:20:12 +08:00
fclose(fp);
});
}
}
}
RtpProcess::~RtpProcess() {
2020-10-24 23:33:13 +08:00
uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000;
2020-04-23 23:18:24 +08:00
WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) {
2020-04-24 12:39:22 +08:00
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
2019-12-05 19:20:12 +08:00
}
}
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint32_t *dts_out) {
2021-02-07 22:02:03 +08:00
auto is_busy = _busy_flag.test_and_set();
if (is_busy) {
//其他线程正在执行本函数
WarnP(this) << "其他线程正在执行本函数";
return false;
}
//没有其他线程执行本函数
onceToken token(nullptr, [&]() {
//本函数执行完毕时,释放状态
_busy_flag.clear();
});
2021-02-04 18:20:59 +08:00
if (!_sock) {
2021-02-07 22:02:03 +08:00
//第一次运行本函数
2020-04-23 23:33:58 +08:00
_sock = sock;
2021-02-04 18:20:59 +08:00
_addr = *addr;
2020-04-23 23:18:24 +08:00
emitOnPublish();
}
2020-10-24 23:33:13 +08:00
_total_bytes += len;
if (_save_file_rtp) {
uint16_t size = (uint16_t)len;
2019-12-05 19:20:12 +08:00
size = htons(size);
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
2020-10-24 23:33:13 +08:00
fwrite((uint8_t *) data, len, 1, _save_file_rtp.get());
2019-12-05 19:20:12 +08:00
}
2020-10-24 23:33:13 +08:00
if (!_process) {
_process = std::make_shared<GB28181Process>(_media_info, this);
2020-03-06 13:00:06 +08:00
}
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
//无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
_last_frame_time.resetTime();
return false;
}
2020-10-24 23:33:13 +08:00
bool ret = _process ? _process->inputRtp(is_udp, data, len) : false;
if (dts_out) {
*dts_out = _dts;
2019-12-05 19:20:12 +08:00
}
2020-10-24 23:33:13 +08:00
return ret;
2019-12-05 19:20:12 +08:00
}
bool RtpProcess::inputFrame(const Frame::Ptr &frame) {
2020-05-17 18:00:23 +08:00
_dts = frame->dts();
if (_save_file_video && frame->getTrackType() == TrackVideo) {
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
2020-02-25 19:00:22 +08:00
}
if (_muxer) {
_last_frame_time.resetTime();
return _muxer->inputFrame(frame);
}
if (_cached_func.size() > kMaxCachedFrame) {
WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped";
return false;
}
auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() {
_last_frame_time.resetTime();
_muxer->inputFrame(frame_cached);
});
return true;
2020-02-25 19:00:22 +08:00
}
bool RtpProcess::addTrack(const Track::Ptr &track) {
if (_muxer) {
return _muxer->addTrack(track);
}
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, track]() {
_muxer->addTrack(track);
});
return true;
2019-12-05 19:20:12 +08:00
}
void RtpProcess::addTrackCompleted() {
if (_muxer) {
_muxer->addTrackCompleted();
} else {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() {
_muxer->addTrackCompleted();
});
}
}
void RtpProcess::doCachedFunc() {
lock_guard<recursive_mutex> lck(_func_mtx);
for (auto &func : _cached_func) {
func();
}
_cached_func.clear();
}
2019-12-05 19:20:12 +08:00
bool RtpProcess::alive() {
2020-12-27 21:21:31 +08:00
if (_stop_rtp_check.load()) {
if(_last_check_alive.elapsedTime() > 5 * 60 * 1000){
//最多暂停5分钟的rtp超时检测因为NAT映射有效期一般不会太长
_stop_rtp_check = false;
} else {
return true;
}
2020-12-27 21:21:31 +08:00
}
_last_check_alive.resetTime();
2021-01-19 16:05:38 +08:00
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec)
2020-10-24 23:33:13 +08:00
if (_last_frame_time.elapsedTime() / 1000 < timeoutSec) {
2019-12-05 19:20:12 +08:00
return true;
}
return false;
}
void RtpProcess::setStopCheckRtp(bool is_check){
_stop_rtp_check = is_check;
2021-04-01 22:24:35 +08:00
if (!is_check) {
_last_frame_time.resetTime();
}
}
2020-10-24 23:33:13 +08:00
void RtpProcess::onDetach() {
if (_on_detach) {
_on_detach();
}
}
void RtpProcess::setOnDetach(const function<void()> &cb) {
_on_detach = cb;
}
2020-04-24 12:39:22 +08:00
string RtpProcess::get_peer_ip() {
2021-02-04 18:20:59 +08:00
return SockUtil::inet_ntoa(((struct sockaddr_in &) _addr).sin_addr);
2019-12-05 19:20:12 +08:00
}
uint16_t RtpProcess::get_peer_port() {
2021-02-04 18:20:59 +08:00
return ntohs(((struct sockaddr_in &) _addr).sin_port);
2019-12-05 19:20:12 +08:00
}
2019-12-06 11:54:10 +08:00
2020-04-24 12:39:22 +08:00
string RtpProcess::get_local_ip() {
2020-10-24 23:33:13 +08:00
if (_sock) {
2020-04-24 12:39:22 +08:00
return _sock->get_local_ip();
2020-04-23 23:30:24 +08:00
}
2020-04-24 12:39:22 +08:00
return "0.0.0.0";
2020-04-23 23:18:24 +08:00
}
uint16_t RtpProcess::get_local_port() {
2020-10-24 23:33:13 +08:00
if (_sock) {
return _sock->get_local_port();
2020-04-23 23:30:24 +08:00
}
2020-04-23 23:18:24 +08:00
return 0;
}
2020-10-24 23:33:13 +08:00
string RtpProcess::getIdentifier() const {
2020-04-23 23:18:24 +08:00
return _media_info._streamid;
}
2021-01-17 20:15:08 +08:00
int RtpProcess::getTotalReaderCount() {
2020-04-23 23:18:24 +08:00
return _muxer ? _muxer->totalReaderCount() : 0;
2020-02-28 16:25:14 +08:00
}
2020-10-24 23:33:13 +08:00
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener);
2020-04-23 23:18:24 +08:00
}
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
2020-09-12 19:09:56 +08:00
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
auto strong_self = weak_self.lock();
if (!strong_self) {
2020-04-23 23:18:24 +08:00
return;
}
if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strong_self->_media_info._app,
strong_self->_media_info._streamid, 0.0f,
2020-09-12 19:09:56 +08:00
true, true, enableHls, enableMP4);
strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc();
InfoP(strong_self) << "允许RTP推流";
2020-04-23 23:18:24 +08:00
} else {
WarnP(strong_self) << "禁止RTP推流:" << err;
2020-04-23 23:18:24 +08:00
}
};
//触发推流鉴权事件
2022-03-02 18:03:44 +08:00
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtp_push, _media_info, invoker, static_cast<SockInfo &>(*this));
2020-10-24 23:33:13 +08:00
if (!flag) {
2020-04-23 23:18:24 +08:00
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
2020-09-12 19:09:56 +08:00
invoker("", toHls, toMP4);
2020-04-23 23:18:24 +08:00
}
2020-02-28 16:25:14 +08:00
}
MediaOriginType RtpProcess::getOriginType(MediaSource &sender) const{
return MediaOriginType::rtp_push;
}
string RtpProcess::getOriginUrl(MediaSource &sender) const {
2021-08-16 17:31:13 +08:00
return _media_info._schema + "://" + _media_info._vhost + "/" + _media_info._app + "/" + _media_info._streamid;
}
2021-08-16 17:31:13 +08:00
std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const {
return const_cast<RtpProcess *>(this)->shared_from_this();
}
2019-12-06 11:54:10 +08:00
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)