ZLMediaKit/src/Rtp/RtpProcess.cpp

306 lines
10 KiB
C++
Raw Normal View History

2019-12-05 19:20:12 +08:00
/*
2020-04-04 20:30:09 +08:00
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
2019-12-06 11:54:10 +08:00
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
2020-04-04 20:30:09 +08:00
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
2019-12-06 11:54:10 +08:00
*/
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
#if defined(ENABLE_RTPPROXY)
#include "mpeg-ps.h"
2019-12-05 19:20:12 +08:00
#include "RtpProcess.h"
#include "Util/File.h"
#include "Extension/H265.h"
#include "Extension/AAC.h"
2019-12-06 11:54:10 +08:00
namespace mediakit{
2019-12-05 19:20:12 +08:00
/**
* frame
*/
class FrameMerger {
public:
FrameMerger() = default;
virtual ~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
merged.append(frame->data(),frame->size());
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
private:
List<Frame::Ptr> _frameCached;
};
2019-12-05 19:20:12 +08:00
string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
static string printAddress(const struct sockaddr *addr){
return StrPrinter << inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
}
RtpProcess::RtpProcess(uint32_t ssrc) {
_ssrc = ssrc;
_track = std::make_shared<SdpTrack>();
_track->_interleaved = 0;
_track->_samplerate = 90000;
_track->_type = TrackVideo;
_track->_ssrc = _ssrc;
DebugL << printSSRC(_ssrc);
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
_muxer = std::make_shared<MultiMediaSourceMuxer>(DEFAULT_VHOST,"rtp",printSSRC(_ssrc),0,toRtxp,toRtxp,toHls,toMP4);
2019-12-05 19:20:12 +08:00
2019-12-06 11:54:10 +08:00
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
2019-12-05 19:20:12 +08:00
{
2019-12-06 11:54:10 +08:00
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".rtp",dump_dir).data(),"wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_rtp.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
{
2019-12-06 11:54:10 +08:00
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".mp2",dump_dir).data(),"wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_ps.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
{
2019-12-06 11:54:10 +08:00
FILE *fp = !dump_dir.empty() ? File::createfile_file(File::absolutePath(printSSRC(_ssrc) + ".video",dump_dir).data(),"wb") : nullptr;
2019-12-05 19:20:12 +08:00
if(fp){
_save_file_video.reset(fp,[](FILE *fp){
fclose(fp);
});
}
}
_merger = std::make_shared<FrameMerger>();
2019-12-05 19:20:12 +08:00
}
RtpProcess::~RtpProcess() {
if(_addr){
DebugL << printSSRC(_ssrc) << " " << printAddress(_addr);
delete _addr;
}else{
DebugL << printSSRC(_ssrc);
}
}
bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
2019-12-06 11:54:10 +08:00
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
2019-12-05 19:20:12 +08:00
//检查源是否合法
if(!_addr){
_addr = new struct sockaddr;
memcpy(_addr,addr, sizeof(struct sockaddr));
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") bind to address:" << printAddress(_addr);
}
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
return false;
}
_last_rtp_time.resetTime();
bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len);
if(dts_out){
*dts_out = _dts;
}
return ret;
2019-12-05 19:20:12 +08:00
}
2020-03-08 21:19:20 +08:00
//判断是否为ts负载
static inline bool checkTS(const uint8_t *packet, int bytes){
return bytes % 188 == 0 && packet[0] == 0x47;
}
2019-12-05 19:20:12 +08:00
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
if(rtp->sequence != _sequence + 1){
WarnL << rtp->sequence << " != " << _sequence << "+1";
}
_sequence = rtp->sequence;
if(_save_file_rtp){
uint16_t size = rtp->size() - 4;
size = htons(size);
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get());
}
2020-03-08 21:19:20 +08:00
decodeRtp(rtp->data() + 4 ,rtp->size() - 4);
2019-12-05 19:20:12 +08:00
}
2020-03-08 21:19:20 +08:00
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) {
2019-12-05 19:20:12 +08:00
if(_save_file_ps){
fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
}
2020-03-06 13:00:06 +08:00
if(!_decoder){
//创建解码器
2020-03-08 21:19:20 +08:00
if(checkTS(packet, bytes)){
2020-03-06 13:00:06 +08:00
//猜测是ts负载
2020-03-08 22:10:37 +08:00
InfoL << "judged to be TS: " << printSSRC(_ssrc);
2020-03-06 13:00:06 +08:00
_decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{
//猜测是ps负载
2020-03-08 22:10:37 +08:00
InfoL << "judged to be PS: " << printSSRC(_ssrc);
2020-03-06 13:00:06 +08:00
_decoder = Decoder::createDecoder(Decoder::decoder_ps);
}
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
onDecode(stream,codecid,flags,pts,dts,data,bytes);
});
}
auto ret = _decoder->input((uint8_t *)packet,bytes);
2019-12-05 19:20:12 +08:00
if(ret != bytes){
WarnL << ret << " != " << bytes << " " << flags;
}
}
2020-02-25 19:00:22 +08:00
#define SWITCH_CASE(codec_id) case codec_id : return #codec_id
static const char *getCodecName(int codec_id) {
switch (codec_id) {
SWITCH_CASE(STREAM_VIDEO_MPEG4);
SWITCH_CASE(STREAM_VIDEO_H264);
SWITCH_CASE(STREAM_VIDEO_H265);
SWITCH_CASE(STREAM_VIDEO_SVAC);
SWITCH_CASE(STREAM_AUDIO_MP3);
SWITCH_CASE(STREAM_AUDIO_AAC);
SWITCH_CASE(STREAM_AUDIO_G711);
SWITCH_CASE(STREAM_AUDIO_G722);
SWITCH_CASE(STREAM_AUDIO_G723);
SWITCH_CASE(STREAM_AUDIO_G729);
SWITCH_CASE(STREAM_AUDIO_SVAC);
default:
return "unknown codec";
}
}
2020-03-06 13:00:06 +08:00
void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes) {
2019-12-13 15:42:12 +08:00
pts /= 90;
dts /= 90;
_stamps[codecid].revise(dts,pts,dts,pts,false);
2020-02-25 18:53:11 +08:00
_dts = dts;
2019-12-05 19:20:12 +08:00
switch (codecid) {
case STREAM_VIDEO_H264: {
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoL << "got video track: H264";
auto track = std::make_shared<H264Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
2020-02-25 19:00:22 +08:00
WarnL << "video track change to H264 from codecid:" << getCodecName(_codecid_video);
2019-12-05 19:20:12 +08:00
return;
}
if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
}
auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, dts, pts,0);
_merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H264FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts,4));
});
2019-12-05 19:20:12 +08:00
break;
}
case STREAM_VIDEO_H265: {
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoL << "got video track: H265";
auto track = std::make_shared<H265Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
2020-02-25 19:00:22 +08:00
WarnL << "video track change to H265 from codecid:" << getCodecName(_codecid_video);
2019-12-05 19:20:12 +08:00
return;
}
if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
}
auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, dts, pts, 0);
_merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H265FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts, 4));
});
2019-12-05 19:20:12 +08:00
break;
}
case STREAM_AUDIO_AAC: {
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
InfoL << "got audio track: AAC";
auto track = std::make_shared<AACTrack>();
_muxer->addTrack(track);
}
if (codecid != _codecid_audio) {
2020-02-25 19:00:22 +08:00
WarnL << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio);
2019-12-05 19:20:12 +08:00
return;
}
2020-04-03 20:45:58 +08:00
_muxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char *) data, bytes, dts, 0, 7));
2019-12-05 19:20:12 +08:00
break;
}
default:
2020-02-25 19:00:22 +08:00
if(codecid != 0){
WarnL << "unsupported codec type:" << getCodecName(codecid);
}
2019-12-05 19:20:12 +08:00
return;
}
}
bool RtpProcess::alive() {
2019-12-06 11:54:10 +08:00
GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec)
2019-12-05 19:20:12 +08:00
if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){
return true;
}
return false;
}
string RtpProcess::get_peer_ip() {
return inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
}
uint16_t RtpProcess::get_peer_port() {
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
}
2019-12-06 11:54:10 +08:00
2020-02-28 16:25:14 +08:00
int RtpProcess::totalReaderCount(){
return _muxer->totalReaderCount();
}
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_muxer->setListener(listener);
}
2019-12-06 11:54:10 +08:00
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)