mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2024-11-26 04:31:37 +08:00
支持动态创建GB28181收流端口并可指定stream_id:#338
This commit is contained in:
parent
30260e5414
commit
477f99b756
@ -741,15 +741,12 @@ void installWebApi() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
#if defined(ENABLE_RTPPROXY)
|
#if defined(ENABLE_RTPPROXY)
|
||||||
api_regist1("/index/api/getSsrcInfo",[](API_ARGS1){
|
api_regist1("/index/api/getRtpInfo",[](API_ARGS1){
|
||||||
CHECK_SECRET();
|
CHECK_SECRET();
|
||||||
CHECK_ARGS("ssrc");
|
CHECK_ARGS("stream_id");
|
||||||
uint32_t ssrc = 0;
|
|
||||||
stringstream ss(allArgs["ssrc"]);
|
|
||||||
ss >> std::hex >> ssrc;
|
|
||||||
|
|
||||||
auto process = RtpSelector::Instance().getProcess(ssrc,false);
|
auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false);
|
||||||
if(!process){
|
if (!process) {
|
||||||
val["exist"] = false;
|
val["exist"] = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -760,10 +757,10 @@ void installWebApi() {
|
|||||||
|
|
||||||
api_regist1("/index/api/openRtpServer",[](API_ARGS1){
|
api_regist1("/index/api/openRtpServer",[](API_ARGS1){
|
||||||
CHECK_SECRET();
|
CHECK_SECRET();
|
||||||
CHECK_ARGS("port", "enable_tcp");
|
CHECK_ARGS("port", "enable_tcp", "stream_id");
|
||||||
|
|
||||||
RtpServer::Ptr server = std::make_shared<RtpServer>();
|
RtpServer::Ptr server = std::make_shared<RtpServer>();
|
||||||
server->start(allArgs["port"], allArgs["enable_tcp"].as<bool>());
|
server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as<bool>());
|
||||||
val["port"] = server->getPort();
|
val["port"] = server->getPort();
|
||||||
|
|
||||||
//保存对象
|
//保存对象
|
||||||
|
@ -16,32 +16,21 @@
|
|||||||
|
|
||||||
namespace mediakit{
|
namespace mediakit{
|
||||||
|
|
||||||
string printSSRC(uint32_t ui32Ssrc) {
|
|
||||||
char tmp[9] = { 0 };
|
|
||||||
ui32Ssrc = htonl(ui32Ssrc);
|
|
||||||
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
|
|
||||||
for (int i = 0; i < 4; i++) {
|
|
||||||
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
|
|
||||||
}
|
|
||||||
return tmp;
|
|
||||||
}
|
|
||||||
|
|
||||||
static string printAddress(const struct sockaddr *addr){
|
static string printAddress(const struct sockaddr *addr){
|
||||||
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
|
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
RtpProcess::RtpProcess(uint32_t ssrc) {
|
RtpProcess::RtpProcess(const string &stream_id) {
|
||||||
_ssrc = ssrc;
|
|
||||||
_track = std::make_shared<SdpTrack>();
|
_track = std::make_shared<SdpTrack>();
|
||||||
_track->_interleaved = 0;
|
_track->_interleaved = 0;
|
||||||
_track->_samplerate = 90000;
|
_track->_samplerate = 90000;
|
||||||
_track->_type = TrackVideo;
|
_track->_type = TrackVideo;
|
||||||
_track->_ssrc = _ssrc;
|
_track->_ssrc = 0;
|
||||||
|
|
||||||
_media_info._schema = RTP_APP_NAME;
|
_media_info._schema = RTP_APP_NAME;
|
||||||
_media_info._vhost = DEFAULT_VHOST;
|
_media_info._vhost = DEFAULT_VHOST;
|
||||||
_media_info._app = RTP_APP_NAME;
|
_media_info._app = RTP_APP_NAME;
|
||||||
_media_info._streamid = printSSRC(_ssrc);
|
_media_info._streamid = stream_id;
|
||||||
|
|
||||||
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
|
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
|
||||||
{
|
{
|
||||||
|
@ -22,11 +22,10 @@ using namespace mediakit;
|
|||||||
|
|
||||||
namespace mediakit{
|
namespace mediakit{
|
||||||
|
|
||||||
string printSSRC(uint32_t ui32Ssrc);
|
|
||||||
class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this<RtpProcess>{
|
class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this<RtpProcess>{
|
||||||
public:
|
public:
|
||||||
typedef std::shared_ptr<RtpProcess> Ptr;
|
typedef std::shared_ptr<RtpProcess> Ptr;
|
||||||
RtpProcess(uint32_t ssrc);
|
RtpProcess(const string &stream_id);
|
||||||
~RtpProcess();
|
~RtpProcess();
|
||||||
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
|
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
|
||||||
bool alive();
|
bool alive();
|
||||||
@ -54,7 +53,6 @@ private:
|
|||||||
std::shared_ptr<FILE> _save_file_rtp;
|
std::shared_ptr<FILE> _save_file_rtp;
|
||||||
std::shared_ptr<FILE> _save_file_ps;
|
std::shared_ptr<FILE> _save_file_ps;
|
||||||
std::shared_ptr<FILE> _save_file_video;
|
std::shared_ptr<FILE> _save_file_video;
|
||||||
uint32_t _ssrc;
|
|
||||||
SdpTrack::Ptr _track;
|
SdpTrack::Ptr _track;
|
||||||
struct sockaddr *_addr = nullptr;
|
struct sockaddr *_addr = nullptr;
|
||||||
uint16_t _sequence = 0;
|
uint16_t _sequence = 0;
|
||||||
|
@ -15,37 +15,44 @@ namespace mediakit{
|
|||||||
|
|
||||||
INSTANCE_IMP(RtpSelector);
|
INSTANCE_IMP(RtpSelector);
|
||||||
|
|
||||||
bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
|
bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len,
|
||||||
uint32_t ssrc = 0;
|
const struct sockaddr *addr,uint32_t *dts_out) {
|
||||||
if(!getSSRC(data,data_len,ssrc)){
|
if (stream_id.empty()) {
|
||||||
WarnL << "get ssrc from rtp failed:" << data_len;
|
//未指定流id,那么使用ssrc为流id
|
||||||
return false;
|
uint32_t ssrc = 0;
|
||||||
|
if (!getSSRC(data, data_len, ssrc)) {
|
||||||
|
WarnL << "get ssrc from rtp failed:" << data_len;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
stream_id = printSSRC(ssrc);
|
||||||
}
|
}
|
||||||
auto process = getProcess(ssrc, true);
|
|
||||||
if(process){
|
//假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流)
|
||||||
return process->inputRtp(sock, data,data_len, addr,dts_out);
|
auto process = getProcess(stream_id, true);
|
||||||
|
if (process) {
|
||||||
|
return process->inputRtp(sock, data, data_len, addr, dts_out);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){
|
bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){
|
||||||
if(data_len < 12){
|
if (data_len < 12) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
uint32_t *ssrc_ptr = (uint32_t *)(data + 8);
|
uint32_t *ssrc_ptr = (uint32_t *) (data + 8);
|
||||||
ssrc = ntohl(*ssrc_ptr);
|
ssrc = ntohl(*ssrc_ptr);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) {
|
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
|
||||||
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
|
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
|
||||||
auto it = _map_rtp_process.find(ssrc);
|
auto it = _map_rtp_process.find(stream_id);
|
||||||
if(it == _map_rtp_process.end() && !makeNew){
|
if (it == _map_rtp_process.end() && !makeNew) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc];
|
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id];
|
||||||
if(!ref){
|
if (!ref) {
|
||||||
ref = std::make_shared<RtpProcessHelper>(ssrc,shared_from_this());
|
ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this());
|
||||||
ref->attachEvent();
|
ref->attachEvent();
|
||||||
createTimer();
|
createTimer();
|
||||||
}
|
}
|
||||||
@ -67,17 +74,15 @@ void RtpSelector::createTimer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) {
|
void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
|
||||||
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
|
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
|
||||||
auto it = _map_rtp_process.find(ssrc);
|
auto it = _map_rtp_process.find(stream_id);
|
||||||
if(it == _map_rtp_process.end()){
|
if (it == _map_rtp_process.end()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (it->second->getProcess().get() != ptr) {
|
||||||
if(it->second->getProcess().get() != ptr){
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
_map_rtp_process.erase(it);
|
_map_rtp_process.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,7 +93,7 @@ void RtpSelector::onManager() {
|
|||||||
++it;
|
++it;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
WarnL << "RtpProcess timeout:" << printSSRC(it->first);
|
WarnL << "RtpProcess timeout:" << it->first;
|
||||||
it = _map_rtp_process.erase(it);
|
it = _map_rtp_process.erase(it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,10 +104,10 @@ RtpSelector::RtpSelector() {
|
|||||||
RtpSelector::~RtpSelector() {
|
RtpSelector::~RtpSelector() {
|
||||||
}
|
}
|
||||||
|
|
||||||
RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr<RtpSelector> &parent) {
|
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
|
||||||
_ssrc = ssrc;
|
_stream_id = stream_id;
|
||||||
_parent = parent;
|
_parent = parent;
|
||||||
_process = std::make_shared<RtpProcess>(_ssrc);
|
_process = std::make_shared<RtpProcess>(stream_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
RtpProcessHelper::~RtpProcessHelper() {
|
RtpProcessHelper::~RtpProcessHelper() {
|
||||||
@ -114,14 +119,14 @@ void RtpProcessHelper::attachEvent() {
|
|||||||
|
|
||||||
bool RtpProcessHelper::close(MediaSource &sender, bool force) {
|
bool RtpProcessHelper::close(MediaSource &sender, bool force) {
|
||||||
//此回调在其他线程触发
|
//此回调在其他线程触发
|
||||||
if(!_process || (!force && _process->totalReaderCount())){
|
if (!_process || (!force && _process->totalReaderCount())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
auto parent = _parent.lock();
|
auto parent = _parent.lock();
|
||||||
if(!parent){
|
if (!parent) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
parent->delProcess(_ssrc,_process.get());
|
parent->delProcess(_stream_id, _process.get());
|
||||||
WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
|
WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -24,19 +24,21 @@ class RtpSelector;
|
|||||||
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
|
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
|
||||||
public:
|
public:
|
||||||
typedef std::shared_ptr<RtpProcessHelper> Ptr;
|
typedef std::shared_ptr<RtpProcessHelper> Ptr;
|
||||||
RtpProcessHelper(uint32_t ssrc,const weak_ptr<RtpSelector > &parent);
|
RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector > &parent);
|
||||||
~RtpProcessHelper();
|
~RtpProcessHelper();
|
||||||
void attachEvent();
|
void attachEvent();
|
||||||
RtpProcess::Ptr & getProcess();
|
RtpProcess::Ptr & getProcess();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// 通知其停止推流
|
// 通知其停止推流
|
||||||
bool close(MediaSource &sender,bool force) override;
|
bool close(MediaSource &sender,bool force) override;
|
||||||
// 观看总人数
|
// 观看总人数
|
||||||
int totalReaderCount(MediaSource &sender) override;
|
int totalReaderCount(MediaSource &sender) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
weak_ptr<RtpSelector > _parent;
|
weak_ptr<RtpSelector > _parent;
|
||||||
RtpProcess::Ptr _process;
|
RtpProcess::Ptr _process;
|
||||||
uint32_t _ssrc = 0;
|
string _stream_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
|
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
|
||||||
@ -44,16 +46,21 @@ public:
|
|||||||
RtpSelector();
|
RtpSelector();
|
||||||
~RtpSelector();
|
~RtpSelector();
|
||||||
|
|
||||||
static RtpSelector &Instance();
|
|
||||||
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
|
|
||||||
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
|
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
|
||||||
RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew);
|
static RtpSelector &Instance();
|
||||||
void delProcess(uint32_t ssrc,const RtpProcess *ptr);
|
|
||||||
|
bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len,
|
||||||
|
const struct sockaddr *addr, uint32_t *dts_out = nullptr);
|
||||||
|
|
||||||
|
RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew);
|
||||||
|
void delProcess(const string &stream_id, const RtpProcess *ptr);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void onManager();
|
void onManager();
|
||||||
void createTimer();
|
void createTimer();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
unordered_map<uint32_t,RtpProcessHelper::Ptr> _map_rtp_process;
|
unordered_map<string,RtpProcessHelper::Ptr> _map_rtp_process;
|
||||||
recursive_mutex _mtx_map;
|
recursive_mutex _mtx_map;
|
||||||
Timer::Ptr _timer;
|
Timer::Ptr _timer;
|
||||||
};
|
};
|
||||||
|
@ -22,12 +22,12 @@ RtpServer::~RtpServer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_ip) {
|
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
|
||||||
_udp_server.reset(new Socket(nullptr, false));
|
_udp_server.reset(new Socket(nullptr, false));
|
||||||
auto &ref = RtpSelector::Instance();
|
auto &ref = RtpSelector::Instance();
|
||||||
auto sock = _udp_server;
|
auto sock = _udp_server;
|
||||||
_udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
|
_udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
|
||||||
ref.inputRtp(sock, buf->data(), buf->size(), addr);
|
ref.inputRtp(sock, const_cast<string &>(stream_id), buf->data(), buf->size(), addr);
|
||||||
});
|
});
|
||||||
|
|
||||||
//创建udp服务器
|
//创建udp服务器
|
||||||
@ -43,6 +43,7 @@ void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_i
|
|||||||
try {
|
try {
|
||||||
//创建tcp服务器
|
//创建tcp服务器
|
||||||
_tcp_server = std::make_shared<TcpServer>(_udp_server->getPoller());
|
_tcp_server = std::make_shared<TcpServer>(_udp_server->getPoller());
|
||||||
|
(*_tcp_server)[RtpSession::kStreamID] = stream_id;
|
||||||
_tcp_server->start<RtpSession>(_udp_server->get_local_port(), local_ip);
|
_tcp_server->start<RtpSession>(_udp_server->get_local_port(), local_ip);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
_tcp_server = nullptr;
|
_tcp_server = nullptr;
|
||||||
|
@ -36,10 +36,11 @@ public:
|
|||||||
/**
|
/**
|
||||||
* 开启服务器,可能抛异常
|
* 开启服务器,可能抛异常
|
||||||
* @param local_port 本地端口,0时为随机端口
|
* @param local_port 本地端口,0时为随机端口
|
||||||
|
* @param stream_id 流id,置空则使用ssrc
|
||||||
* @param enable_tcp 是否启用tcp服务器
|
* @param enable_tcp 是否启用tcp服务器
|
||||||
* @param local_ip 绑定的本地网卡ip
|
* @param local_ip 绑定的本地网卡ip
|
||||||
*/
|
*/
|
||||||
void start(uint16_t local_port, bool enable_tcp = true, const char *local_ip = "0.0.0.0");
|
void start(uint16_t local_port, const string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取绑定的本地端口
|
* 获取绑定的本地端口
|
||||||
|
@ -11,8 +11,15 @@
|
|||||||
#if defined(ENABLE_RTPPROXY)
|
#if defined(ENABLE_RTPPROXY)
|
||||||
#include "RtpSession.h"
|
#include "RtpSession.h"
|
||||||
#include "RtpSelector.h"
|
#include "RtpSelector.h"
|
||||||
|
#include "Network/TcpServer.h"
|
||||||
namespace mediakit{
|
namespace mediakit{
|
||||||
|
|
||||||
|
const string RtpSession::kStreamID = "stream_id";
|
||||||
|
|
||||||
|
void RtpSession::attachServer(const TcpServer &server) {
|
||||||
|
_stream_id = const_cast<TcpServer &>(server)[kStreamID];
|
||||||
|
}
|
||||||
|
|
||||||
RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
|
RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
|
||||||
DebugP(this);
|
DebugP(this);
|
||||||
socklen_t addr_len = sizeof(addr);
|
socklen_t addr_len = sizeof(addr);
|
||||||
@ -21,7 +28,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
|
|||||||
RtpSession::~RtpSession() {
|
RtpSession::~RtpSession() {
|
||||||
DebugP(this);
|
DebugP(this);
|
||||||
if(_process){
|
if(_process){
|
||||||
RtpSelector::Instance().delProcess(_ssrc,_process.get());
|
RtpSelector::Instance().delProcess(_stream_id,_process.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,7 +43,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void RtpSession::onError(const SockException &err) {
|
void RtpSession::onError(const SockException &err) {
|
||||||
WarnL << _ssrc << " " << err.what();
|
WarnL << _stream_id << " " << err.what();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtpSession::onManager() {
|
void RtpSession::onManager() {
|
||||||
@ -51,13 +58,19 @@ void RtpSession::onManager() {
|
|||||||
|
|
||||||
void RtpSession::onRtpPacket(const char *data, uint64_t len) {
|
void RtpSession::onRtpPacket(const char *data, uint64_t len) {
|
||||||
if (!_process) {
|
if (!_process) {
|
||||||
if (!RtpSelector::getSSRC(data + 2, len - 2, _ssrc)) {
|
uint32_t ssrc;
|
||||||
|
if (!RtpSelector::getSSRC(data + 2, len - 2, ssrc)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_process = RtpSelector::Instance().getProcess(_ssrc, true);
|
if (_stream_id.empty()) {
|
||||||
|
//未指定流id就使用ssrc为流id
|
||||||
|
_stream_id = printSSRC(ssrc);
|
||||||
|
}
|
||||||
|
//tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess
|
||||||
|
_process = RtpSelector::Instance().getProcess(_stream_id, true);
|
||||||
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
|
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
|
||||||
}
|
}
|
||||||
_process->inputRtp(_sock,data + 2, len - 2, &addr);
|
_process->inputRtp(_sock, data + 2, len - 2, &addr);
|
||||||
_ticker.resetTime();
|
_ticker.resetTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,22 +22,26 @@ namespace mediakit{
|
|||||||
|
|
||||||
class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{
|
class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{
|
||||||
public:
|
public:
|
||||||
|
static const string kStreamID;
|
||||||
RtpSession(const Socket::Ptr &sock);
|
RtpSession(const Socket::Ptr &sock);
|
||||||
~RtpSession() override;
|
~RtpSession() override;
|
||||||
void onRecv(const Buffer::Ptr &) override;
|
void onRecv(const Buffer::Ptr &) override;
|
||||||
void onError(const SockException &err) override;
|
void onError(const SockException &err) override;
|
||||||
void onManager() override;
|
void onManager() override;
|
||||||
|
void attachServer(const TcpServer &server) override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// 通知其停止推流
|
// 通知其停止推流
|
||||||
bool close(MediaSource &sender,bool force) override;
|
bool close(MediaSource &sender,bool force) override;
|
||||||
// 观看总人数
|
// 观看总人数
|
||||||
int totalReaderCount(MediaSource &sender) override;
|
int totalReaderCount(MediaSource &sender) override;
|
||||||
void onRtpPacket(const char *data,uint64_t len) override;
|
void onRtpPacket(const char *data,uint64_t len) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint32_t _ssrc = 0;
|
|
||||||
RtpProcess::Ptr _process;
|
RtpProcess::Ptr _process;
|
||||||
Ticker _ticker;
|
Ticker _ticker;
|
||||||
struct sockaddr addr;
|
struct sockaddr addr;
|
||||||
|
string _stream_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
}//namespace mediakit
|
}//namespace mediakit
|
||||||
|
@ -38,6 +38,7 @@ static bool loadFile(const char *path){
|
|||||||
uint16_t len;
|
uint16_t len;
|
||||||
char rtp[2 * 1024];
|
char rtp[2 * 1024];
|
||||||
struct sockaddr addr = {0};
|
struct sockaddr addr = {0};
|
||||||
|
string stream_id;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (2 != fread(&len, 1, 2, fp)) {
|
if (2 != fread(&len, 1, 2, fp)) {
|
||||||
WarnL;
|
WarnL;
|
||||||
@ -55,7 +56,7 @@ static bool loadFile(const char *path){
|
|||||||
}
|
}
|
||||||
|
|
||||||
uint32_t timeStamp;
|
uint32_t timeStamp;
|
||||||
RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp);
|
RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp);
|
||||||
if(timeStamp_last){
|
if(timeStamp_last){
|
||||||
auto diff = timeStamp - timeStamp_last;
|
auto diff = timeStamp - timeStamp_last;
|
||||||
if(diff > 0 && diff < 500){
|
if(diff > 0 && diff < 500){
|
||||||
|
Loading…
Reference in New Issue
Block a user