移除RtpSelector相关逻辑,优化代码结构

This commit is contained in:
xiongziliang 2024-06-09 10:52:10 +08:00
parent fbac3f9ba0
commit 77d2df1695
18 changed files with 174 additions and 419 deletions

@ -1 +1 @@
Subproject commit 1e1a990783c6c09452419c0aaa6d72ce02d0202b Subproject commit 5144e2aa521df6d473308bfb31172054772a634f

View File

@ -56,7 +56,7 @@ API_EXPORT void API_CALL mk_rtp_server_set_on_detach2(mk_rtp_server ctx, on_mk_r
RtpServer::Ptr *server = (RtpServer::Ptr *) ctx; RtpServer::Ptr *server = (RtpServer::Ptr *) ctx;
if (cb) { if (cb) {
std::shared_ptr<void> ptr(user_data, user_data_free ? user_data_free : [](void *) {}); std::shared_ptr<void> ptr(user_data, user_data_free ? user_data_free : [](void *) {});
(*server)->setOnDetach([cb, ptr]() { (*server)->setOnDetach([cb, ptr](const SockException &ex) {
cb(ptr.get()); cb(ptr.get());
}); });
} else { } else {

View File

@ -45,7 +45,7 @@
#include "Http/HttpRequester.h" #include "Http/HttpRequester.h"
#include "Player/PlayerProxy.h" #include "Player/PlayerProxy.h"
#include "Pusher/PusherProxy.h" #include "Pusher/PusherProxy.h"
#include "Rtp/RtpSelector.h" #include "Rtp/RtpProcess.h"
#include "Record/MP4Reader.h" #include "Record/MP4Reader.h"
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
@ -485,7 +485,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mod
auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) { auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) {
server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex);
}); });
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id](const SockException &ex) {
//设置rtp超时移除事件 //设置rtp超时移除事件
s_rtp_server.erase(stream_id); s_rtp_server.erase(stream_id);
}); });
@ -1198,8 +1198,8 @@ void installWebApi() {
api_regist("/index/api/getRtpInfo",[](API_ARGS_MAP){ api_regist("/index/api/getRtpInfo",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]);
auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto process = src ? src->getRtpProcess() : nullptr;
if (!process) { if (!process) {
val["exist"] = false; val["exist"] = false;
return; return;
@ -1438,9 +1438,10 @@ void installWebApi() {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
//只是暂停流的检查流媒体服务器做为流负载服务收流就转发RTSP/RTMP有自己暂停协议 //只是暂停流的检查流媒体服务器做为流负载服务收流就转发RTSP/RTMP有自己暂停协议
auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]);
if (rtp_process) { auto process = src ? src->getRtpProcess() : nullptr;
rtp_process->setStopCheckRtp(true); if (process) {
process->setStopCheckRtp(true);
} else { } else {
val["code"] = API::NotFound; val["code"] = API::NotFound;
} }
@ -1449,9 +1450,10 @@ void installWebApi() {
api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) { api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto src = MediaSource::find(DEFAULT_VHOST, kRtpAppName, allArgs["stream_id"]);
if (rtp_process) { auto process = src ? src->getRtpProcess() : nullptr;
rtp_process->setStopCheckRtp(false); if (process) {
process->setStopCheckRtp(false);
} else { } else {
val["code"] = API::NotFound; val["code"] = API::NotFound;
} }

View File

@ -271,9 +271,14 @@ toolkit::EventPoller::Ptr MediaSource::getOwnerPoller() {
throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed: " + getUrl()); throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed: " + getUrl());
} }
std::shared_ptr<MultiMediaSourceMuxer> MediaSource::getMuxer() { std::shared_ptr<MultiMediaSourceMuxer> MediaSource::getMuxer() const {
auto listener = _listener.lock(); auto listener = _listener.lock();
return listener ? listener->getMuxer(*this) : nullptr; return listener ? listener->getMuxer(const_cast<MediaSource&>(*this)) : nullptr;
}
std::shared_ptr<RtpProcess> MediaSource::getRtpProcess() const {
auto listener = _listener.lock();
return listener ? listener->getRtpProcess(const_cast<MediaSource&>(*this)) : nullptr;
} }
void MediaSource::onReaderChanged(int size) { void MediaSource::onReaderChanged(int size) {
@ -803,11 +808,16 @@ toolkit::EventPoller::Ptr MediaSourceEventInterceptor::getOwnerPoller(MediaSourc
throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed"); throw std::runtime_error(toolkit::demangle(typeid(*this).name()) + "::getOwnerPoller failed");
} }
std::shared_ptr<MultiMediaSourceMuxer> MediaSourceEventInterceptor::getMuxer(MediaSource &sender) { std::shared_ptr<MultiMediaSourceMuxer> MediaSourceEventInterceptor::getMuxer(MediaSource &sender) const {
auto listener = _listener.lock(); auto listener = _listener.lock();
return listener ? listener->getMuxer(sender) : nullptr; return listener ? listener->getMuxer(sender) : nullptr;
} }
std::shared_ptr<RtpProcess> MediaSourceEventInterceptor::getRtpProcess(MediaSource &sender) const {
auto listener = _listener.lock();
return listener ? listener->getRtpProcess(sender) : nullptr;
}
bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) { bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) {
auto listener = _listener.lock(); auto listener = _listener.lock();
if (!listener) { if (!listener) {

View File

@ -41,6 +41,7 @@ enum class MediaOriginType : uint8_t {
std::string getOriginTypeString(MediaOriginType type); std::string getOriginTypeString(MediaOriginType type);
class MediaSource; class MediaSource;
class RtpProcess;
class MultiMediaSourceMuxer; class MultiMediaSourceMuxer;
class MediaSourceEvent { class MediaSourceEvent {
public: public:
@ -88,7 +89,9 @@ public:
// 获取所有track相关信息 // 获取所有track相关信息
virtual std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector<Track::Ptr>(); }; virtual std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector<Track::Ptr>(); };
// 获取MultiMediaSourceMuxer对象 // 获取MultiMediaSourceMuxer对象
virtual std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) { return nullptr; } virtual std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) const { return nullptr; }
// 获取RtpProcess对象
virtual std::shared_ptr<RtpProcess> getRtpProcess(MediaSource &sender) const { return nullptr; }
class SendRtpArgs { class SendRtpArgs {
public: public:
@ -278,7 +281,8 @@ public:
bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override; bool stopSendRtp(MediaSource &sender, const std::string &ssrc) override;
float getLossRate(MediaSource &sender, TrackType type) override; float getLossRate(MediaSource &sender, TrackType type) override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) override; std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) const override;
std::shared_ptr<RtpProcess> getRtpProcess(MediaSource &sender) const override;
private: private:
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
@ -395,7 +399,9 @@ public:
// 获取所在线程 // 获取所在线程
toolkit::EventPoller::Ptr getOwnerPoller(); toolkit::EventPoller::Ptr getOwnerPoller();
// 获取MultiMediaSourceMuxer对象 // 获取MultiMediaSourceMuxer对象
std::shared_ptr<MultiMediaSourceMuxer> getMuxer(); std::shared_ptr<MultiMediaSourceMuxer> getMuxer() const;
// 获取RtpProcess对象
std::shared_ptr<RtpProcess> getRtpProcess() const;
////////////////static方法查找或生成MediaSource//////////////// ////////////////static方法查找或生成MediaSource////////////////

View File

@ -466,8 +466,8 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) {
} }
} }
std::shared_ptr<MultiMediaSourceMuxer> MultiMediaSourceMuxer::getMuxer(MediaSource &sender) { std::shared_ptr<MultiMediaSourceMuxer> MultiMediaSourceMuxer::getMuxer(MediaSource &sender) const {
return shared_from_this(); return const_cast<MultiMediaSourceMuxer*>(this)->shared_from_this();
} }
bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {

View File

@ -127,7 +127,7 @@ public:
/** /**
* *
*/ */
std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) override; std::shared_ptr<MultiMediaSourceMuxer> getMuxer(MediaSource &sender) const override;
const ProtocolOption &getOption() const; const ProtocolOption &getOption() const;
const MediaTuple &getMediaTuple() const; const MediaTuple &getMediaTuple() const;

View File

@ -11,26 +11,29 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "GB28181Process.h" #include "GB28181Process.h"
#include "RtpProcess.h" #include "RtpProcess.h"
#include "RtpSelector.h"
#include "Http/HttpTSPlayer.h"
#include "Util/File.h" #include "Util/File.h"
#include "Common/config.h" #include "Common/config.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
static constexpr char kRtpAppName[] = "rtp";
//在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验 //在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验
//但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回 //但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回
static constexpr size_t kMaxCachedFrame = 200; static constexpr size_t kMaxCachedFrame = 200;
namespace mediakit { namespace mediakit {
RtpProcess::RtpProcess(const string &stream_id) { RtpProcess::Ptr RtpProcess::createProcess(std::string stream_id) {
RtpProcess::Ptr ret(new RtpProcess(std::move(stream_id)));
ret->createTimer();
return ret;
}
RtpProcess::RtpProcess(string stream_id) {
_media_info.schema = kRtpAppName; _media_info.schema = kRtpAppName;
_media_info.vhost = DEFAULT_VHOST; _media_info.vhost = DEFAULT_VHOST;
_media_info.app = kRtpAppName; _media_info.app = kRtpAppName;
_media_info.stream = stream_id; _media_info.stream = std::move(stream_id);
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
{ {
@ -75,6 +78,25 @@ RtpProcess::~RtpProcess() {
} }
} }
void RtpProcess::onManager() {
if (!alive()) {
onDetach(SockException(Err_timeout, "RtpProcess timeout"));
}
}
void RtpProcess::createTimer() {
//创建超时管理定时器
weak_ptr<RtpProcess> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(3.0f, [weakSelf] {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
strongSelf->onManager();
return true;
}, EventPollerPool::Instance().getPoller());
}
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) {
if (!isRtp(data, len)) { if (!isRtp(data, len)) {
WarnP(this) << "Not rtp packet"; WarnP(this) << "Not rtp packet";
@ -203,13 +225,14 @@ void RtpProcess::setOnlyTrack(OnlyTrack only_track) {
_only_track = only_track; _only_track = only_track;
} }
void RtpProcess::onDetach() { void RtpProcess::onDetach(const SockException &ex) {
if (_on_detach) { if (_on_detach) {
_on_detach(); WarnL << ex << ", stream_id: " << getIdentifier();
_on_detach(ex);
} }
} }
void RtpProcess::setOnDetach(function<void()> cb) { void RtpProcess::setOnDetach(onDetachCB cb) {
_on_detach = std::move(cb); _on_detach = std::move(cb);
} }
@ -256,9 +279,6 @@ void RtpProcess::emitOnPublish() {
} }
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, option); strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, option);
if (!option.stream_replace.empty()) {
RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace);
}
switch (strong_self->_only_track) { switch (strong_self->_only_track) {
case kOnlyAudio: strong_self->_muxer->setOnlyAudio(); break; case kOnlyAudio: strong_self->_muxer->setOnlyAudio(); break;
case kOnlyVideo: strong_self->_muxer->enableAudio(false); break; case kOnlyVideo: strong_self->_muxer->enableAudio(false); break;
@ -294,6 +314,15 @@ std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const {
return const_cast<RtpProcess *>(this)->shared_from_this(); return const_cast<RtpProcess *>(this)->shared_from_this();
} }
RtpProcess::Ptr RtpProcess::getRtpProcess(mediakit::MediaSource &sender) const {
return const_cast<RtpProcess *>(this)->shared_from_this();
}
bool RtpProcess::close(mediakit::MediaSource &sender) {
onDetach(SockException(Err_shutdown, "close media"));
return true;
}
toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) {
if (_sock) { if (_sock) {
return _sock->getPoller(); return _sock->getPoller();

View File

@ -18,11 +18,14 @@
namespace mediakit { namespace mediakit {
static constexpr char kRtpAppName[] = "rtp";
class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this<RtpProcess>{ class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this<RtpProcess>{
public: public:
using Ptr = std::shared_ptr<RtpProcess>; using Ptr = std::shared_ptr<RtpProcess>;
friend class RtpProcessHelper; using onDetachCB = std::function<void(const toolkit::SockException &ex)>;
RtpProcess(const std::string &stream_id);
static Ptr createProcess(std::string stream_id);
~RtpProcess(); ~RtpProcess();
enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 }; enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 };
@ -38,20 +41,16 @@ public:
*/ */
bool inputRtp(bool is_udp, const toolkit::Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr , uint64_t *dts_out = nullptr); bool inputRtp(bool is_udp, const toolkit::Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr , uint64_t *dts_out = nullptr);
/**
*
*/
bool alive();
/** /**
* RtpSelector移除时触发 * RtpSelector移除时触发
*/ */
void onDetach(); void onDetach(const toolkit::SockException &ex);
/** /**
* onDetach事件回调 * onDetach事件回调
*/ */
void setOnDetach(std::function<void()> cb); void setOnDetach(onDetachCB cb);
/** /**
* onDetach事件回调,false检查RTP超时true停止 * onDetach事件回调,false检查RTP超时true停止
@ -88,10 +87,17 @@ protected:
std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
float getLossRate(MediaSource &sender, TrackType type) override; float getLossRate(MediaSource &sender, TrackType type) override;
Ptr getRtpProcess(mediakit::MediaSource &sender) const override;
bool close(mediakit::MediaSource &sender) override;
private: private:
RtpProcess(std::string stream_id);
void emitOnPublish(); void emitOnPublish();
void doCachedFunc(); void doCachedFunc();
bool alive();
void onManager();
void createTimer();
private: private:
OnlyTrack _only_track = kAll; OnlyTrack _only_track = kAll;
@ -102,12 +108,13 @@ private:
toolkit::Socket::Ptr _sock; toolkit::Socket::Ptr _sock;
MediaInfo _media_info; MediaInfo _media_info;
toolkit::Ticker _last_frame_time; toolkit::Ticker _last_frame_time;
std::function<void()> _on_detach; onDetachCB _on_detach;
std::shared_ptr<FILE> _save_file_rtp; std::shared_ptr<FILE> _save_file_rtp;
std::shared_ptr<FILE> _save_file_video; std::shared_ptr<FILE> _save_file_video;
ProcessInterface::Ptr _process; ProcessInterface::Ptr _process;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
std::atomic_bool _stop_rtp_check{false}; std::atomic_bool _stop_rtp_check{false};
toolkit::Timer::Ptr _timer;
toolkit::Ticker _last_check_alive; toolkit::Ticker _last_check_alive;
std::recursive_mutex _func_mtx; std::recursive_mutex _func_mtx;
std::deque<std::function<void()> > _cached_func; std::deque<std::function<void()> > _cached_func;

View File

@ -1,168 +0,0 @@
/*
* Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
*
* Use of this source code is governed by MIT-like license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include <stddef.h>
#include "RtpSelector.h"
#include "RtpSplitter.h"
using namespace std;
using namespace toolkit;
namespace mediakit{
INSTANCE_IMP(RtpSelector);
void RtpSelector::clear(){
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
_map_rtp_process.clear();
_map_stream_replace.clear();
}
bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
if (data_len < 12) {
return false;
}
uint32_t *ssrc_ptr = (uint32_t *) (data + 8);
ssrc = ntohl(*ssrc_ptr);
return true;
}
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
string stream_id_origin = stream_id;
auto it_replace = _map_stream_replace.find(stream_id);
if (it_replace != _map_stream_replace.end()) {
stream_id_origin = it_replace->second;
}
auto it = _map_rtp_process.find(stream_id_origin);
if (it == _map_rtp_process.end() && !makeNew) {
return nullptr;
}
if (it != _map_rtp_process.end() && makeNew) {
//已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题
throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id_origin << ") already existed");
}
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin];
if (!ref) {
ref = std::make_shared<RtpProcessHelper>(stream_id_origin, shared_from_this());
ref->attachEvent();
createTimer();
}
return ref->getProcess();
}
void RtpSelector::createTimer() {
if (!_timer) {
//创建超时管理定时器
weak_ptr<RtpSelector> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>(3.0f, [weakSelf] {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
strongSelf->onManager();
return true;
}, EventPollerPool::Instance().getPoller());
}
}
void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
RtpProcess::Ptr process;
{
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
auto it = _map_rtp_process.find(stream_id);
if (it == _map_rtp_process.end()) {
return;
}
if (it->second->getProcess().get() != ptr) {
return;
}
process = it->second->getProcess();
_map_rtp_process.erase(it);
delStreamReplace(stream_id);
}
process->onDetach();
}
void RtpSelector::addStreamReplace(const string &stream_id, const std::string &stream_replace) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
_map_stream_replace[stream_replace] = stream_id;
}
void RtpSelector::delStreamReplace(const string &stream_id) {
for (auto it = _map_stream_replace.begin(); it != _map_stream_replace.end(); ++it) {
if (it->second == stream_id) {
_map_stream_replace.erase(it);
break;
}
}
}
void RtpSelector::onManager() {
List<RtpProcess::Ptr> clear_list;
{
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) {
if (it->second->getProcess()->alive()) {
++it;
continue;
}
WarnL << "RtpProcess timeout:" << it->first;
clear_list.emplace_back(it->second->getProcess());
delStreamReplace(it->first);
it = _map_rtp_process.erase(it);
}
}
clear_list.for_each([](const RtpProcess::Ptr &process) {
process->onDetach();
});
}
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
_stream_id = stream_id;
_parent = parent;
_process = std::make_shared<RtpProcess>(stream_id);
}
RtpProcessHelper::~RtpProcessHelper() {
auto process = std::move(_process);
try {
// flush时确保线程安全
process->getOwnerPoller(MediaSource::NullMediaSource())->async([process]() { process->flush(); });
} catch (...) {
// 忽略getOwnerPoller可能抛出的异常
}
}
void RtpProcessHelper::attachEvent() {
//主要目的是close回调触发时能把对象从RtpSelector中删除
_process->setDelegate(shared_from_this());
}
bool RtpProcessHelper::close(MediaSource &sender) {
//此回调在其他线程触发
auto parent = _parent.lock();
if (!parent) {
return false;
}
parent->delProcess(_stream_id, _process.get());
WarnL << "close media: " << sender.getUrl();
return true;
}
RtpProcess::Ptr &RtpProcessHelper::getProcess() {
return _process;
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)

View File

@ -1,89 +0,0 @@
/*
* Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
*
* Use of this source code is governed by MIT-like license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_RTPSELECTOR_H
#define ZLMEDIAKIT_RTPSELECTOR_H
#if defined(ENABLE_RTPPROXY)
#include <stdint.h>
#include <mutex>
#include <unordered_map>
#include "RtpProcess.h"
#include "Common/MediaSource.h"
namespace mediakit{
class RtpSelector;
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
public:
using Ptr = std::shared_ptr<RtpProcessHelper>;
RtpProcessHelper(const std::string &stream_id, const std::weak_ptr<RtpSelector > &parent);
~RtpProcessHelper();
void attachEvent();
RtpProcess::Ptr & getProcess();
protected:
// 通知其停止推流
bool close(MediaSource &sender) override;
private:
std::string _stream_id;
RtpProcess::Ptr _process;
std::weak_ptr<RtpSelector> _parent;
};
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
public:
class ProcessExisted : public std::runtime_error {
public:
template<typename ...T>
ProcessExisted(T && ...args) : std::runtime_error(std::forward<T>(args)...) {}
};
static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc);
static RtpSelector &Instance();
/**
*
*/
void clear();
/**
* rtp处理器
* @param stream_id id
* @param makeNew , true时
* @return rtp处理器
*/
RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew);
/**
* rtp处理器
* @param stream_id id
* @param ptr rtp处理器指针
*/
void delProcess(const std::string &stream_id, const RtpProcess *ptr);
void addStreamReplace(const std::string &stream_id, const std::string &stream_replace);
private:
void onManager();
void createTimer();
void delStreamReplace(const std::string &stream_id);
private:
toolkit::Timer::Ptr _timer;
std::recursive_mutex _mtx_map;
std::unordered_map<std::string,RtpProcessHelper::Ptr> _map_rtp_process;
std::unordered_map<std::string,std::string> _map_stream_replace;
};
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_RTPSELECTOR_H

View File

@ -11,7 +11,7 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "Util/uv_errno.h" #include "Util/uv_errno.h"
#include "RtpServer.h" #include "RtpServer.h"
#include "RtpSelector.h" #include "RtpProcess.h"
#include "Rtcp/RtcpContext.h" #include "Rtcp/RtcpContext.h"
#include "Common/config.h" #include "Common/config.h"
@ -35,38 +35,34 @@ public:
_stream_id = std::move(stream_id); _stream_id = std::move(stream_id);
} }
~RtcpHelper() {
if (_process) {
// 删除rtp处理器
RtpSelector::Instance().delProcess(_stream_id, _process.get());
}
}
void setRtpServerInfo(uint16_t local_port, RtpServer::TcpMode mode, bool re_use_port, uint32_t ssrc, int only_track) { void setRtpServerInfo(uint16_t local_port, RtpServer::TcpMode mode, bool re_use_port, uint32_t ssrc, int only_track) {
_local_port = local_port;
_tcp_mode = mode;
_re_use_port = re_use_port;
_ssrc = ssrc; _ssrc = ssrc;
_only_track = only_track; _process = RtpProcess::createProcess(_stream_id);
_process->setOnlyTrack((RtpProcess::OnlyTrack)only_track);
_timeout_cb = [=]() mutable {
NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, local_port, _stream_id, (int)mode, re_use_port, ssrc);
};
weak_ptr<RtcpHelper> weak_self = shared_from_this();
_process->setOnDetach([weak_self](const SockException &ex) {
if (auto strong_self = weak_self.lock()) {
if (strong_self->_on_detach) {
strong_self->_on_detach(ex);
}
if (ex.getErrCode() == Err_timeout) {
strong_self->_timeout_cb();
}
}
});
} }
void setOnDetach(function<void()> cb) { void setOnDetach(RtpProcess::onDetachCB cb) { _on_detach = std::move(cb); }
if (_process) {
_process->setOnDetach(std::move(cb)); RtpProcess::Ptr getProcess() const { return _process; }
} else {
_on_detach = std::move(cb);
}
}
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setOnDetach(std::move(_on_detach));
cancelDelayTask();
}
_process->inputRtp(true, sock, buf->data(), buf->size(), addr); _process->inputRtp(true, sock, buf->data(), buf->size(), addr);
// 统计rtp接受情况用于发送rr包 // 统计rtp接受情况用于发送rr包
auto header = (RtpHeader *)buf->data(); auto header = (RtpHeader *)buf->data();
sendRtcp(ntohl(header->ssrc), addr); sendRtcp(ntohl(header->ssrc), addr);
@ -92,37 +88,12 @@ public:
// 收到sr rtcp后驱动返回rr rtcp // 收到sr rtcp后驱动返回rr rtcp
strong_self->sendRtcp(strong_self->_ssrc, (struct sockaddr *)(strong_self->_rtcp_addr.get())); strong_self->sendRtcp(strong_self->_ssrc, (struct sockaddr *)(strong_self->_rtcp_addr.get()));
}); });
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec);
_delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() {
if (auto strong_self = weak_self.lock()) {
auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false);
if (!process && strong_self->_on_detach) {
strong_self->_on_detach();
}
if(process && strong_self->_on_detach){// tcp 链接防止断开不删除rtpServer
process->setOnDetach(std::move(strong_self->_on_detach));
}
if (!process) { // process 未创建触发rtp server 超时事件
NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, strong_self->_local_port, strong_self->_stream_id,
(int)strong_self->_tcp_mode, strong_self->_re_use_port, strong_self->_ssrc);
}
}
return 0;
});
}
void cancelDelayTask() {
if (_delay_task) {
_delay_task->cancel();
_delay_task = nullptr;
}
} }
private: private:
void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) {
// 每5秒发送一次rtcp // 每5秒发送一次rtcp
if (_ticker.elapsedTime() < 5000 || !_process) { if (_ticker.elapsedTime() < 5000) {
return; return;
} }
_ticker.resetTime(); _ticker.resetTime();
@ -141,19 +112,14 @@ private:
} }
private: private:
bool _re_use_port = false;
int _only_track = 0;
uint16_t _local_port = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
RtpServer::TcpMode _tcp_mode = RtpServer::NONE; std::function<void()> _timeout_cb;
Ticker _ticker; Ticker _ticker;
Socket::Ptr _rtcp_sock; Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
std::string _stream_id; std::string _stream_id;
function<void()> _on_detach; RtpProcess::onDetachCB _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr; std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
EventPoller::DelayTask::Ptr _delay_task;
}; };
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
@ -186,7 +152,12 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
(*tcp_server)[RtpSession::kSSRC] = ssrc; (*tcp_server)[RtpSession::kSSRC] = ssrc;
(*tcp_server)[RtpSession::kOnlyTrack] = only_track; (*tcp_server)[RtpSession::kOnlyTrack] = only_track;
if (tcp_mode == PASSIVE) { if (tcp_mode == PASSIVE) {
tcp_server->start<RtpSession>(local_port, local_ip); weak_ptr<RtpServer> weak_self = shared_from_this();
tcp_server->start<RtpSession>(local_port, local_ip, 1024, [weak_self](std::shared_ptr<RtpSession> &session) {
if (auto strong_self = weak_self.lock()) {
session->setRtpProcess(strong_self->_rtcp_helper->getProcess());
}
});
} else if (stream_id.empty()) { } else if (stream_id.empty()) {
// tcp主动模式时只能一个端口一个流必须指定流id; 创建TcpServer对象也仅用于传参 // tcp主动模式时只能一个端口一个流必须指定流id; 创建TcpServer对象也仅用于传参
throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id"); throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id");
@ -242,7 +213,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
_rtcp_helper = helper; _rtcp_helper = helper;
} }
void RtpServer::setOnDetach(function<void()> cb) { void RtpServer::setOnDetach(RtpProcess::onDetachCB cb) {
if (_rtcp_helper) { if (_rtcp_helper) {
_rtcp_helper->setOnDetach(std::move(cb)); _rtcp_helper->setOnDetach(std::move(cb));
} }
@ -277,6 +248,7 @@ void RtpServer::connectToServer(const std::string &url, uint16_t port, const fun
void RtpServer::onConnect() { void RtpServer::onConnect() {
auto rtp_session = std::make_shared<RtpSession>(_rtp_socket); auto rtp_session = std::make_shared<RtpSession>(_rtp_socket);
rtp_session->setRtpProcess(_rtcp_helper->getProcess());
rtp_session->attachServer(*_tcp_server); rtp_session->attachServer(*_tcp_server);
_rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { _rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
rtp_session->onRecv(buf); rtp_session->onRecv(buf);

View File

@ -62,7 +62,7 @@ public:
/** /**
* RtpProcess onDetach事件回调 * RtpProcess onDetach事件回调
*/ */
void setOnDetach(std::function<void()> cb); void setOnDetach(RtpProcess::onDetachCB cb);
/** /**
* ssrc * ssrc

View File

@ -10,7 +10,7 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "RtpSession.h" #include "RtpSession.h"
#include "RtpSelector.h" #include "RtpProcess.h"
#include "Network/TcpServer.h" #include "Network/TcpServer.h"
#include "Rtsp/Rtsp.h" #include "Rtsp/Rtsp.h"
#include "Rtsp/RtpReceiver.h" #include "Rtsp/RtpReceiver.h"
@ -60,28 +60,24 @@ void RtpSession::onRecv(const Buffer::Ptr &data) {
} }
void RtpSession::onError(const SockException &err) { void RtpSession::onError(const SockException &err) {
WarnP(this) << _stream_id << " " << err; if (_emit_detach) {
if (_process) { _process->onDetach(err);
RtpSelector::Instance().delProcess(_stream_id, _process.get());
_process = nullptr;
} }
WarnP(this) << _stream_id << " " << err;
} }
void RtpSession::onManager() { void RtpSession::onManager() {
if (_process && !_process->alive()) {
shutdown(SockException(Err_timeout, "receive rtp timeout"));
}
if (!_process && _ticker.createdTime() > 10 * 1000) { if (!_process && _ticker.createdTime() > 10 * 1000) {
shutdown(SockException(Err_timeout, "illegal connection")); shutdown(SockException(Err_timeout, "illegal connection"));
} }
} }
void RtpSession::onRtpPacket(const char *data, size_t len) { void RtpSession::setRtpProcess(RtpProcess::Ptr process) {
if (_delay_close) { _emit_detach = true;
// 正在延时关闭中,忽略所有数据 _process = std::move(process);
return;
} }
void RtpSession::onRtpPacket(const char *data, size_t len) {
if (!isRtp(data, len)) { if (!isRtp(data, len)) {
// 忽略非rtp数据 // 忽略非rtp数据
WarnP(this) << "Not rtp packet"; WarnP(this) << "Not rtp packet";
@ -104,33 +100,31 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
return; return;
} }
} }
if (!_process) {
// 未设置ssrc时尝试获取ssrc // 未设置ssrc时尝试获取ssrc
if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) { if (!_ssrc && !getSSRC(data, len, _ssrc)) {
return; return;
} }
if (_stream_id.empty()) {
// 未指定流id就使用ssrc为流id // 未指定流id就使用ssrc为流id
if (_stream_id.empty()) {
_stream_id = printSSRC(_ssrc); _stream_id = printSSRC(_ssrc);
} }
try {
_process = RtpSelector::Instance().getProcess(_stream_id, true); if (!_process) {
} catch (RtpSelector::ProcessExisted &ex) { _process = RtpProcess::createProcess(_stream_id);
if (!_is_udp) {
// tcp情况下立即断开连接
throw;
}
// udp情况下延时断开连接(等待超时自动关闭)防止频繁创建销毁RtpSession对象
WarnP(this) << ex.what();
_delay_close = true;
return;
}
_process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setDelegate(static_pointer_cast<RtpSession>(shared_from_this())); weak_ptr<RtpSession> weak_self = static_pointer_cast<RtpSession>(shared_from_this());
_process->setOnDetach([weak_self](const SockException &ex) {
if (auto strong_self = weak_self.lock()) {
strong_self->_process = nullptr;
strong_self->shutdown(ex);
}
});
} }
try { try {
uint32_t rtp_ssrc = 0; uint32_t rtp_ssrc = 0;
RtpSelector::getSSRC(data, len, rtp_ssrc); getSSRC(data, len, rtp_ssrc);
if (rtp_ssrc != _ssrc) { if (rtp_ssrc != _ssrc) {
WarnP(this) << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << _ssrc; WarnP(this) << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << _ssrc;
return; return;
@ -143,26 +137,10 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} else { } else {
throw; throw;
} }
} catch (std::exception &ex) {
if (!_is_udp) {
// tcp情况下立即断开连接
throw;
}
// udp情况下延时断开连接(等待超时自动关闭)防止频繁创建销毁RtpSession对象
WarnP(this) << ex.what();
_delay_close = true;
return;
} }
_ticker.resetTime(); _ticker.resetTime();
} }
bool RtpSession::close(MediaSource &sender) {
//此回调在其他线程触发
string err = StrPrinter << "close media: " << sender.getUrl();
safeShutdown(SockException(Err_shutdown, err));
return true;
}
static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) { static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) {
// rtp前面必须预留两个字节的长度字段 // rtp前面必须预留两个字节的长度字段
for (ssize_t i = 2; i <= len - 4; ++i) { for (ssize_t i = 2; i <= len - 4; ++i) {
@ -268,7 +246,7 @@ const char *RtpSession::searchByPsHeaderFlag(const char *data, size_t len) {
// TODO or Not ? 更新设置ssrc // TODO or Not ? 更新设置ssrc
uint32_t rtp_ssrc = 0; uint32_t rtp_ssrc = 0;
RtpSelector::getSSRC(rtp_ptr + 2, len, rtp_ssrc); getSSRC(rtp_ptr + 2, len, rtp_ssrc);
_ssrc = rtp_ssrc; _ssrc = rtp_ssrc;
InfoL << "设置_ssrc为" << _ssrc; InfoL << "设置_ssrc为" << _ssrc;
// RtpServer::updateSSRC(uint32_t ssrc) // RtpServer::updateSSRC(uint32_t ssrc)

View File

@ -20,7 +20,7 @@
namespace mediakit{ namespace mediakit{
class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent { class RtpSession : public toolkit::Session, public RtpSplitter {
public: public:
static const std::string kStreamID; static const std::string kStreamID;
static const std::string kSSRC; static const std::string kSSRC;
@ -34,10 +34,9 @@ public:
void onManager() override; void onManager() override;
void setParams(toolkit::mINI &ini); void setParams(toolkit::mINI &ini);
void attachServer(const toolkit::Server &server) override; void attachServer(const toolkit::Server &server) override;
void setRtpProcess(RtpProcess::Ptr process);
protected: protected:
// 通知其停止推流
bool close(MediaSource &sender) override;
// 收到rtp回调 // 收到rtp回调
void onRtpPacket(const char *data, size_t len) override; void onRtpPacket(const char *data, size_t len) override;
// RtpSplitter override // RtpSplitter override
@ -48,10 +47,10 @@ protected:
const char *searchByPsHeaderFlag(const char *data, size_t len); const char *searchByPsHeaderFlag(const char *data, size_t len);
private: private:
bool _delay_close = false;
bool _is_udp = false; bool _is_udp = false;
bool _search_rtp = false; bool _search_rtp = false;
bool _search_rtp_finished = false; bool _search_rtp_finished = false;
bool _emit_detach = false;
int _only_track = 0; int _only_track = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
toolkit::Ticker _ticker; toolkit::Ticker _ticker;

View File

@ -470,6 +470,15 @@ string printSSRC(uint32_t ui32Ssrc) {
return tmp; return tmp;
} }
bool getSSRC(const char *data, size_t data_len, uint32_t &ssrc) {
if (data_len < 12) {
return false;
}
uint32_t *ssrc_ptr = (uint32_t *)(data + 8);
ssrc = ntohl(*ssrc_ptr);
return true;
}
bool isRtp(const char *buf, size_t size) { bool isRtp(const char *buf, size_t size) {
if (size < 2) { if (size < 2) {
return false; return false;

View File

@ -317,6 +317,7 @@ toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved);
void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true); void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
// 十六进制方式打印ssrc // 十六进制方式打印ssrc
std::string printSSRC(uint32_t ui32Ssrc); std::string printSSRC(uint32_t ui32Ssrc);
bool getSSRC(const char *data, size_t data_len, uint32_t &ssrc);
bool isRtp(const char *buf, size_t size); bool isRtp(const char *buf, size_t size);
bool isRtcp(const char *buf, size_t size); bool isRtcp(const char *buf, size_t size);

View File

@ -17,7 +17,7 @@
#include "Rtsp/RtspSession.h" #include "Rtsp/RtspSession.h"
#include "Rtmp/RtmpSession.h" #include "Rtmp/RtmpSession.h"
#include "Http/HttpSession.h" #include "Http/HttpSession.h"
#include "Rtp/RtpSelector.h" #include "Rtp/RtpProcess.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) {
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET; addr.ss_family = AF_INET;
auto sock = Socket::createSocket(poller); auto sock = Socket::createSocket(poller);
auto process = RtpSelector::Instance().getProcess("test", true); auto process = RtpProcess::createProcess("test");
uint64_t stamp_last = 0; uint64_t stamp_last = 0;
auto total_size = std::make_shared<size_t>(0); auto total_size = std::make_shared<size_t>(0);
@ -89,7 +89,6 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) {
auto ret = do_read(); auto ret = do_read();
if (!ret) { if (!ret) {
WarnL << *total_size / 1024 << "KB"; WarnL << *total_size / 1024 << "KB";
RtpSelector::Instance().delProcess("test", process.get());
} }
return ret; return ret;
}); });