完善GB28181推流

This commit is contained in:
xiongziliang 2020-07-08 09:36:10 +08:00
parent 477f99b756
commit 248b2d5cb9
6 changed files with 74 additions and 21 deletions

View File

@ -27,9 +27,24 @@ public:
typedef std::shared_ptr<RtpProcess> Ptr; typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(const string &stream_id); RtpProcess(const string &stream_id);
~RtpProcess(); ~RtpProcess();
/**
* rtp
* @param sock socket
* @param data rtp数据指针
* @param data_len rtp数据长度
* @param addr
* @param dts_out dts
* @return
*/
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
/**
*
*/
bool alive(); bool alive();
/// SockInfo override
string get_local_ip() override; string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
string get_peer_ip() override; string get_peer_ip() override;

View File

@ -15,20 +15,17 @@ namespace mediakit{
INSTANCE_IMP(RtpSelector); INSTANCE_IMP(RtpSelector);
bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,
const struct sockaddr *addr,uint32_t *dts_out) { const struct sockaddr *addr,uint32_t *dts_out) {
if (stream_id.empty()) { //使用ssrc为流id
//未指定流id那么使用ssrc为流id
uint32_t ssrc = 0; uint32_t ssrc = 0;
if (!getSSRC(data, data_len, ssrc)) { if (!getSSRC(data, data_len, ssrc)) {
WarnL << "get ssrc from rtp failed:" << data_len; WarnL << "get ssrc from rtp failed:" << data_len;
return false; return false;
} }
stream_id = printSSRC(ssrc);
}
//假定指定了流id那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) //假定指定了流id那么通过流id来区分是否为一路流(哪怕可能同时收到多路流)
auto process = getProcess(stream_id, true); auto process = getProcess(printSSRC(ssrc), true);
if (process) { if (process) {
return process->inputRtp(sock, data, data_len, addr, dts_out); return process->inputRtp(sock, data, data_len, addr, dts_out);
} }

View File

@ -49,10 +49,31 @@ public:
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
static RtpSelector &Instance(); static RtpSelector &Instance();
bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, /**
* rtp流ssrc分流
* @param sock socket
* @param data
* @param data_len
* @param addr rtp流源地址
* @param dts_out dts
* @return
*/
bool inputRtp(const Socket::Ptr &sock, const char *data, int data_len,
const struct sockaddr *addr, uint32_t *dts_out = nullptr); const struct sockaddr *addr, uint32_t *dts_out = nullptr);
/**
* rtp处理器
* @param stream_id id
* @param makeNew
* @return rtp处理器
*/
RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew); RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew);
/**
* rtp处理器
* @param stream_id id
* @param ptr rtp处理器指针
*/
void delProcess(const string &stream_id, const RtpProcess *ptr); void delProcess(const string &stream_id, const RtpProcess *ptr);
private: private:

View File

@ -17,25 +17,20 @@ RtpServer::RtpServer() {
} }
RtpServer::~RtpServer() { RtpServer::~RtpServer() {
if(_udp_server){ if(_on_clearup){
_udp_server->setOnRead(nullptr); _on_clearup();
} }
} }
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
_udp_server.reset(new Socket(nullptr, false)); _udp_server.reset(new Socket(nullptr, false));
auto &ref = RtpSelector::Instance();
auto sock = _udp_server;
_udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(sock, const_cast<string &>(stream_id), buf->data(), buf->size(), addr);
});
//创建udp服务器 //创建udp服务器
if (!_udp_server->bindUdpSock(local_port, local_ip)) { if (!_udp_server->bindUdpSock(local_port, local_ip)) {
_udp_server = nullptr; _udp_server = nullptr;
string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true));
throw std::runtime_error(err); throw std::runtime_error(err);
} }
//设置udp socket读缓存 //设置udp socket读缓存
SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024);
@ -51,6 +46,31 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable
throw; throw;
} }
} }
auto sock = _udp_server;
RtpProcess::Ptr process;
if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
process = RtpSelector::Instance().getProcess(stream_id, true);
_udp_server->setOnRead([sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
process->inputRtp(sock, buf->data(), buf->size(), addr);
});
} else {
//未指定流id一个端口多个流通过ssrc来分流
auto &ref = RtpSelector::Instance();
_udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(sock, buf->data(), buf->size(), addr);
});
}
_on_clearup = [sock, process, stream_id]() {
//去除循环引用
sock->setOnRead(nullptr);
if (process) {
//删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
}
};
} }
EventPoller::Ptr RtpServer::getPoller() { EventPoller::Ptr RtpServer::getPoller() {

View File

@ -55,6 +55,7 @@ public:
protected: protected:
Socket::Ptr _udp_server; Socket::Ptr _udp_server;
TcpServer::Ptr _tcp_server; TcpServer::Ptr _tcp_server;
function<void()> _on_clearup;
}; };
}//namespace mediakit }//namespace mediakit

View File

@ -38,7 +38,6 @@ static bool loadFile(const char *path){
uint16_t len; uint16_t len;
char rtp[2 * 1024]; char rtp[2 * 1024];
struct sockaddr addr = {0}; struct sockaddr addr = {0};
string stream_id;
while (true) { while (true) {
if (2 != fread(&len, 1, 2, fp)) { if (2 != fread(&len, 1, 2, fp)) {
WarnL; WarnL;
@ -56,7 +55,7 @@ static bool loadFile(const char *path){
} }
uint32_t timeStamp; uint32_t timeStamp;
RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp); RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp);
if(timeStamp_last){ if(timeStamp_last){
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if(diff > 0 && diff < 500){ if(diff > 0 && diff < 500){