mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-22 10:40:05 +08:00
RTP代理支持完整的事件
This commit is contained in:
parent
5acdf1f789
commit
109fab2cb1
@ -299,5 +299,13 @@ uint16_t RtpProcess::get_peer_port() {
|
||||
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
|
||||
}
|
||||
|
||||
int RtpProcess::totalReaderCount(){
|
||||
return _muxer->totalReaderCount();
|
||||
}
|
||||
|
||||
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
|
||||
_muxer->setListener(listener);
|
||||
}
|
||||
|
||||
}//namespace mediakit
|
||||
#endif//defined(ENABLE_RTPPROXY)
|
@ -49,6 +49,9 @@ public:
|
||||
bool alive();
|
||||
string get_peer_ip();
|
||||
uint16_t get_peer_port();
|
||||
|
||||
int totalReaderCount();
|
||||
void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
|
||||
protected:
|
||||
void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ;
|
||||
void onRtpDecode(const void *packet, int bytes, uint32_t timestamp, int flags) override;
|
||||
|
@ -63,11 +63,12 @@ RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) {
|
||||
if(it == _map_rtp_process.end() && !makeNew){
|
||||
return nullptr;
|
||||
}
|
||||
RtpProcess::Ptr &ref = _map_rtp_process[ssrc];
|
||||
RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc];
|
||||
if(!ref){
|
||||
ref = std::make_shared<RtpProcess>(ssrc);
|
||||
ref = std::make_shared<RtpProcessHelper>(ssrc,shared_from_this());
|
||||
ref->attachEvent();
|
||||
}
|
||||
return ref;
|
||||
return ref->getProcess();
|
||||
}
|
||||
|
||||
void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) {
|
||||
@ -77,7 +78,7 @@ void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
if(it->second.get() != ptr){
|
||||
if(it->second->getProcess().get() != ptr){
|
||||
return;
|
||||
}
|
||||
|
||||
@ -87,7 +88,7 @@ void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) {
|
||||
void RtpSelector::onManager() {
|
||||
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
|
||||
for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) {
|
||||
if (it->second->alive()) {
|
||||
if (it->second->getProcess()->alive()) {
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
@ -102,5 +103,47 @@ RtpSelector::RtpSelector() {
|
||||
RtpSelector::~RtpSelector() {
|
||||
}
|
||||
|
||||
RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr<RtpSelector> &parent) {
|
||||
_ssrc = ssrc;
|
||||
_parent = parent;
|
||||
_process = std::make_shared<RtpProcess>(_ssrc);
|
||||
}
|
||||
|
||||
RtpProcessHelper::~RtpProcessHelper() {
|
||||
}
|
||||
|
||||
void RtpProcessHelper::attachEvent() {
|
||||
_process->setListener(shared_from_this());
|
||||
}
|
||||
|
||||
bool RtpProcessHelper::close(MediaSource &sender, bool force) {
|
||||
//此回调在其他线程触发
|
||||
if(!_process || (!force && _process->totalReaderCount())){
|
||||
return false;
|
||||
}
|
||||
auto parent = _parent.lock();
|
||||
if(!parent){
|
||||
return false;
|
||||
}
|
||||
parent->delProcess(_ssrc,_process.get());
|
||||
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
|
||||
return true;
|
||||
}
|
||||
|
||||
void RtpProcessHelper::onNoneReader(MediaSource &sender) {
|
||||
if(!_process || _process->totalReaderCount()){
|
||||
return;
|
||||
}
|
||||
MediaSourceEvent::onNoneReader(sender);
|
||||
}
|
||||
|
||||
int RtpProcessHelper::totalReaderCount(MediaSource &sender) {
|
||||
return _process ? _process->totalReaderCount() : sender.totalReaderCount();
|
||||
}
|
||||
|
||||
RtpProcess::Ptr &RtpProcessHelper::getProcess() {
|
||||
return _process;
|
||||
}
|
||||
|
||||
}//namespace mediakit
|
||||
#endif//defined(ENABLE_RTPPROXY)
|
@ -32,9 +32,31 @@
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include "RtpProcess.h"
|
||||
#include "Common/MediaSource.h"
|
||||
|
||||
namespace mediakit{
|
||||
|
||||
class RtpSelector;
|
||||
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
|
||||
public:
|
||||
typedef std::shared_ptr<RtpProcessHelper> Ptr;
|
||||
RtpProcessHelper(uint32_t ssrc,const weak_ptr<RtpSelector > &parent);
|
||||
~RtpProcessHelper();
|
||||
void attachEvent();
|
||||
RtpProcess::Ptr & getProcess();
|
||||
protected:
|
||||
// 通知其停止推流
|
||||
bool close(MediaSource &sender,bool force) override;
|
||||
// 通知无人观看
|
||||
void onNoneReader(MediaSource &sender) override;
|
||||
// 观看总人数
|
||||
int totalReaderCount(MediaSource &sender) override;
|
||||
private:
|
||||
weak_ptr<RtpSelector > _parent;
|
||||
RtpProcess::Ptr _process;
|
||||
uint32_t _ssrc = 0;
|
||||
};
|
||||
|
||||
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
|
||||
public:
|
||||
RtpSelector();
|
||||
@ -48,7 +70,7 @@ public:
|
||||
private:
|
||||
void onManager();
|
||||
private:
|
||||
unordered_map<uint32_t,RtpProcess::Ptr> _map_rtp_process;
|
||||
unordered_map<uint32_t,RtpProcessHelper::Ptr> _map_rtp_process;
|
||||
recursive_mutex _mtx_map;
|
||||
Ticker _last_rtp_time;
|
||||
};
|
||||
|
@ -71,10 +71,34 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) {
|
||||
return;
|
||||
}
|
||||
_process = RtpSelector::Instance().getProcess(_ssrc, true);
|
||||
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
|
||||
}
|
||||
_process->inputRtp(data + 2, len - 2, &addr);
|
||||
_ticker.resetTime();
|
||||
}
|
||||
|
||||
bool RtpSession::close(MediaSource &sender, bool force) {
|
||||
//此回调在其他线程触发
|
||||
if(!_process || (!force && _process->totalReaderCount())){
|
||||
return false;
|
||||
}
|
||||
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
|
||||
safeShutdown(SockException(Err_shutdown,err));
|
||||
return true;
|
||||
}
|
||||
|
||||
void RtpSession::onNoneReader(MediaSource &sender) {
|
||||
//此回调在其他线程触发
|
||||
if(!_process || _process->totalReaderCount()){
|
||||
return;
|
||||
}
|
||||
MediaSourceEvent::onNoneReader(sender);
|
||||
}
|
||||
|
||||
int RtpSession::totalReaderCount(MediaSource &sender) {
|
||||
//此回调在其他线程触发
|
||||
return _process ? _process->totalReaderCount() : sender.totalReaderCount();
|
||||
}
|
||||
|
||||
}//namespace mediakit
|
||||
#endif//defined(ENABLE_RTPPROXY)
|
@ -36,14 +36,20 @@ using namespace toolkit;
|
||||
|
||||
namespace mediakit{
|
||||
|
||||
class RtpSession : public TcpSession , public RtpSplitter{
|
||||
class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{
|
||||
public:
|
||||
RtpSession(const Socket::Ptr &sock);
|
||||
~RtpSession() override;
|
||||
void onRecv(const Buffer::Ptr &) override;
|
||||
void onError(const SockException &err) override;
|
||||
void onManager() override;
|
||||
private:
|
||||
protected:
|
||||
// 通知其停止推流
|
||||
bool close(MediaSource &sender,bool force) override;
|
||||
// 通知无人观看
|
||||
void onNoneReader(MediaSource &sender) override;
|
||||
// 观看总人数
|
||||
int totalReaderCount(MediaSource &sender) override;
|
||||
void onRtpPacket(const char *data,uint64_t len) override;
|
||||
private:
|
||||
uint32_t _ssrc = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user