rtp服务器支持opus推流

This commit is contained in:
ziyue 2021-06-21 20:32:13 +08:00
parent c90f348801
commit 510d908bc4

View File

@ -15,6 +15,7 @@
#include "Extension/CommonRtp.h" #include "Extension/CommonRtp.h"
#include "Extension/H264Rtp.h" #include "Extension/H264Rtp.h"
#include "Extension/Factory.h" #include "Extension/Factory.h"
#include "Extension/Opus.h"
namespace mediakit{ namespace mediakit{
@ -26,15 +27,16 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){
class RtpReceiverImp : public RtpReceiver { class RtpReceiverImp : public RtpReceiver {
public: public:
using Ptr = std::shared_ptr<RtpReceiverImp>; using Ptr = std::shared_ptr<RtpReceiverImp>;
RtpReceiverImp( function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){ RtpReceiverImp(int sample_rate, function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){
_sample_rate = sample_rate;
_on_sort = std::move(cb); _on_sort = std::move(cb);
_on_before_sort = std::move(cb_before); _on_before_sort = std::move(cb_before);
} }
~RtpReceiverImp() override = default; ~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, int samplerate, uint8_t *ptr, size_t len){ bool inputRtp(TrackType type, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, samplerate, ptr, len); return handleOneRtp((int) type, type, _sample_rate, ptr, len);
} }
protected: protected:
@ -49,6 +51,7 @@ protected:
} }
private: private:
int _sample_rate;
function<void(RtpPacket::Ptr rtp)> _on_sort; function<void(RtpPacket::Ptr rtp)> _on_sort;
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort; function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
}; };
@ -63,49 +66,66 @@ GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *
GB28181Process::~GB28181Process() {} GB28181Process::~GB28181Process() {}
bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) {
RtpHeader *header = (RtpHeader *) data; _rtp_decoder[rtp->getHeader()->pt]->inputRtp(rtp, false);
auto &ref = _rtp_receiver[header->pt];
if (!ref) {
ref = std::make_shared<RtpReceiverImp>([this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
}
return ref->inputRtp(TrackVideo, RtpPayload::getClockRate(header->pt), (unsigned char *) data, data_len);
} }
void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) { bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
auto pt = rtp->getHeader()->pt; RtpHeader *header = (RtpHeader *) data;
auto &ref = _rtp_decoder[pt]; auto pt = header->pt;
auto &ref = _rtp_receiver[pt];
if (!ref) { if (!ref) {
if (_rtp_receiver.size() > 2) {
//防止pt类型太多导致内存溢出
throw std::invalid_argument("rtp pt类型不得超过2种!");
}
switch (pt) { switch (pt) {
case 100: {
//opus负载
ref = std::make_shared<RtpReceiverImp>(48000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<OpusTrack>();
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
case 99: { case 99: {
//H265负载 //H265负载
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<H265Track>(); auto track = std::make_shared<H265Track>();
_interface->addTrack(track); _interface->addTrack(track);
ref = Factory::getRtpDecoderByTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break; break;
} }
case 98: { case 98: {
//H264负载 //H264负载
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<H264Track>(); auto track = std::make_shared<H264Track>();
_interface->addTrack(track); _interface->addTrack(track);
ref = Factory::getRtpDecoderByTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
case 0: {
//CodecG711U
auto track = std::make_shared<G711Track>(CodecG711U, 8000, 1, 16);
_interface->addTrack(track);
ref = Factory::getRtpDecoderByTrack(track);
break; break;
} }
case 0:
//CodecG711U
case 8: { case 8: {
//CodecG711A //CodecG711A
auto track = std::make_shared<G711Track>(CodecG711A, 8000, 1, 16); ref = std::make_shared<RtpReceiverImp>(8000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<G711Track>(pt == 0 ? CodecG711U : CodecG711A, 8000, 1, 16);
_interface->addTrack(track); _interface->addTrack(track);
ref = Factory::getRtpDecoderByTrack(track); _rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break; break;
} }
@ -113,8 +133,13 @@ void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) {
if (pt != 33 && pt != 96) { if (pt != 33 && pt != 96) {
WarnL << "rtp payload type未识别(" << (int) pt << "),已按ts或ps负载处理"; WarnL << "rtp payload type未识别(" << (int) pt << "),已按ts或ps负载处理";
} }
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
//ts或ps负载 //ts或ps负载
ref = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024); _rtp_decoder[pt] = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024);
//设置dump目录 //设置dump目录
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (!dump_dir.empty()) { if (!dump_dir.empty()) {
@ -130,13 +155,12 @@ void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) {
} }
//设置frame回调 //设置frame回调
ref->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) { _rtp_decoder[pt]->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
onRtpDecode(frame); onRtpDecode(frame);
})); }));
} }
//解码rtp return ref->inputRtp(TrackVideo, (unsigned char *) data, data_len);
ref->inputRtp(rtp, false);
} }
const char *GB28181Process::onSearchPacketTail(const char *packet,size_t bytes){ const char *GB28181Process::onSearchPacketTail(const char *packet,size_t bytes){