ZLMediaKit/src/Rtp/RtpProcess.cpp

255 lines
8.0 KiB
C++
Raw Normal View History

2019-12-05 19:20:12 +08:00
/*
2020-04-04 20:30:09 +08:00
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
2019-12-06 11:54:10 +08:00
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
2020-04-04 20:30:09 +08:00
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
2019-12-06 11:54:10 +08:00
*/
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
#if defined(ENABLE_RTPPROXY)
2019-12-05 19:20:12 +08:00
#include "RtpProcess.h"
#include "Util/File.h"
2020-05-17 18:00:23 +08:00
#include "Http/HttpTSPlayer.h"
2020-04-23 16:14:24 +08:00
#define RTP_APP_NAME "rtp"
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
namespace mediakit{
2019-12-05 19:20:12 +08:00
static string printAddress(const struct sockaddr *addr){
2020-04-23 16:14:24 +08:00
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
2019-12-05 19:20:12 +08:00
}
RtpProcess::RtpProcess(const string &stream_id) {
2019-12-05 19:20:12 +08:00
_track = std::make_shared<SdpTrack>();
_track->_interleaved = 0;
_track->_samplerate = 90000;
_track->_type = TrackVideo;
_track->_ssrc = 0;
2020-04-23 23:18:24 +08:00
_media_info._schema = RTP_APP_NAME;
_media_info._vhost = DEFAULT_VHOST;
_media_info._app = RTP_APP_NAME;
_media_info._streamid = stream_id;
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
2019-12-05 19:20:12 +08:00
{
2020-04-24 12:39:22 +08:00
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_rtp.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
{
2020-04-24 12:39:22 +08:00
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_ps.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
{
2020-04-24 12:39:22 +08:00
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_video.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
}
RtpProcess::~RtpProcess() {
2020-04-23 23:18:24 +08:00
uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
2020-04-24 12:39:22 +08:00
if (_total_bytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
2019-12-05 19:20:12 +08:00
}
2020-06-28 15:21:41 +08:00
if (_addr) {
delete _addr;
_addr = nullptr;
}
2019-12-05 19:20:12 +08:00
}
2020-04-23 23:30:24 +08:00
bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
2019-12-06 11:54:10 +08:00
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
2019-12-05 19:20:12 +08:00
//检查源是否合法
if(!_addr){
_addr = new struct sockaddr;
2020-04-23 23:33:58 +08:00
_sock = sock;
2019-12-05 19:20:12 +08:00
memcpy(_addr,addr, sizeof(struct sockaddr));
2020-04-23 23:18:24 +08:00
DebugP(this) << "bind to address:" << printAddress(_addr);
//推流鉴权
emitOnPublish();
}
if(!_muxer){
//无权限推流
return false;
2019-12-05 19:20:12 +08:00
}
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
2020-04-23 23:18:24 +08:00
DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
2019-12-05 19:20:12 +08:00
return false;
}
2020-04-24 12:39:22 +08:00
_total_bytes += data_len;
bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len);
if(dts_out){
*dts_out = _dts;
}
return ret;
2019-12-05 19:20:12 +08:00
}
2020-03-08 21:19:20 +08:00
//判断是否为ts负载
static inline bool checkTS(const uint8_t *packet, int bytes){
2020-05-17 18:00:23 +08:00
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
}
2019-12-05 19:20:12 +08:00
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
2020-05-30 18:33:28 +08:00
if(rtp->sequence != _sequence + 1 && _sequence != 0){
2020-05-25 14:28:02 +08:00
WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流";
2019-12-05 19:20:12 +08:00
}
_sequence = rtp->sequence;
if(_save_file_rtp){
uint16_t size = rtp->size() - 4;
size = htons(size);
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get());
}
2020-03-08 21:19:20 +08:00
decodeRtp(rtp->data() + 4 ,rtp->size() - 4);
2019-12-05 19:20:12 +08:00
}
2020-03-08 21:19:20 +08:00
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) {
2019-12-05 19:20:12 +08:00
if(_save_file_ps){
fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
}
2020-05-17 18:00:23 +08:00
if (!_decoder) {
2020-03-06 13:00:06 +08:00
//创建解码器
2020-05-17 18:00:23 +08:00
if (checkTS(packet, bytes)) {
2020-03-06 13:00:06 +08:00
//猜测是ts负载
2020-04-23 23:30:24 +08:00
InfoP(this) << "judged to be TS";
2020-05-21 11:44:57 +08:00
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
2020-05-17 18:00:23 +08:00
} else {
2020-03-06 13:00:06 +08:00
//猜测是ps负载
2020-04-23 23:30:24 +08:00
InfoP(this) << "judged to be PS";
2020-05-21 11:44:57 +08:00
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, this);
2020-03-06 13:00:06 +08:00
}
}
2020-05-17 18:00:23 +08:00
if (_decoder) {
auto ret = _decoder->input((uint8_t *) packet, bytes);
if (ret != bytes) {
WarnP(this) << ret << " != " << bytes << " " << flags;
}
2019-12-05 19:20:12 +08:00
}
}
2020-05-17 18:00:23 +08:00
void RtpProcess::inputFrame(const Frame::Ptr &frame){
_last_rtp_time.resetTime();
2020-05-17 18:00:23 +08:00
_dts = frame->dts();
if (_save_file_video && frame->getTrackType() == TrackVideo) {
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
2020-02-25 19:00:22 +08:00
}
2020-05-17 18:00:23 +08:00
_muxer->inputFrame(frame);
2020-02-25 19:00:22 +08:00
}
2020-05-17 18:00:23 +08:00
void RtpProcess::addTrack(const Track::Ptr & track){
_muxer->addTrack(track);
2019-12-05 19:20:12 +08:00
}
bool RtpProcess::alive() {
2019-12-06 11:54:10 +08:00
GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec)
2019-12-05 19:20:12 +08:00
if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){
return true;
}
return false;
}
2020-04-24 12:39:22 +08:00
string RtpProcess::get_peer_ip() {
if(_addr){
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
2020-04-23 23:18:24 +08:00
}
2020-04-24 12:39:22 +08:00
return "0.0.0.0";
2019-12-05 19:20:12 +08:00
}
uint16_t RtpProcess::get_peer_port() {
2020-04-23 23:18:24 +08:00
if(!_addr){
return 0;
}
2019-12-05 19:20:12 +08:00
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
}
2019-12-06 11:54:10 +08:00
2020-04-24 12:39:22 +08:00
string RtpProcess::get_local_ip() {
2020-04-23 23:30:24 +08:00
if(_sock){
2020-04-24 12:39:22 +08:00
return _sock->get_local_ip();
2020-04-23 23:30:24 +08:00
}
2020-04-24 12:39:22 +08:00
return "0.0.0.0";
2020-04-23 23:18:24 +08:00
}
uint16_t RtpProcess::get_local_port() {
2020-04-23 23:30:24 +08:00
if(_sock){
return _sock->get_local_port();
}
2020-04-23 23:18:24 +08:00
return 0;
}
string RtpProcess::getIdentifier() const{
return _media_info._streamid;
}
2020-02-28 16:25:14 +08:00
int RtpProcess::totalReaderCount(){
2020-04-23 23:18:24 +08:00
return _muxer ? _muxer->totalReaderCount() : 0;
2020-02-28 16:25:14 +08:00
}
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
2020-04-23 23:18:24 +08:00
if(_muxer){
_muxer->setMediaListener(listener);
}else{
_listener = listener;
}
}
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
if (err.empty()) {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
strongSelf->_media_info._app,
strongSelf->_media_info._streamid, 0,
enableRtxp, enableRtxp, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf->_listener);
InfoP(strongSelf) << "允许RTP推流";
} else {
WarnP(strongSelf) << "禁止RTP推流:" << err;
}
};
//触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toRtxp, General::kPublishToRtxp);
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
invoker("", toRtxp, toHls, toMP4);
}
2020-02-28 16:25:14 +08:00
}
2019-12-06 11:54:10 +08:00
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)