添加合并写功能

This commit is contained in:
xiongziliang 2020-04-09 16:19:03 +08:00
parent 8a8da61ef5
commit 99a55ddaaa
14 changed files with 279 additions and 149 deletions

View File

@ -42,6 +42,9 @@ publishToRtxp=1
publishToHls=1 publishToHls=1
#是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置 #是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置
publishToMP4=0 publishToMP4=0
#合并写缓存大小(单位毫秒)合并写指服务器缓存一定的数据后才会一次性写入socket这样能提高性能但是会提高延时
#在开启低延时模式后,该参数不起作用
mergeWriteMS=300
[hls] [hls]
#hls写文件的buf大小调整参数可以提高文件io性能 #hls写文件的buf大小调整参数可以提高文件io性能

View File

@ -450,4 +450,50 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
#endif //ENABLE_MP4 #endif //ENABLE_MP4
} }
static bool isFlushAble_default(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
if (new_stamp < last_stamp) {
//时间戳回退(可能seek中)
return true;
}
if (!is_audio) {
//这是视频,时间戳发送变化或者缓存超过1024个
return last_stamp != new_stamp || cache_size >= 1024;
}
//这是音频,缓存超过100ms或者缓存个数超过10个
return new_stamp > last_stamp + 100 || cache_size > 10;
}
static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) {
if (new_stamp < last_stamp) {
//时间戳回退(可能seek中)
return true;
}
if(new_stamp > last_stamp + merge_ms){
//时间戳增量超过合并写阈值
return true;
}
if (!is_audio) {
//这是视频,缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题
//而且sendmsg接口一般最多只能发送1024个数据包
return cache_size >= 1024;
}
//这是音频音频缓存超过20个
return cache_size > 20;
}
bool FlushPolicy::isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
GET_CONFIG(bool,ultraLowDelay, General::kUltraLowDelay);
GET_CONFIG(int,mergeWriteMS, General::kMergeWriteMS);
if(ultraLowDelay || mergeWriteMS <= 0){
//关闭了合并写或者合并写阈值小于等于0
return isFlushAble_default(_is_audio, last_stamp, new_stamp, cache_size);
}
return isFlushAble_merge(_is_audio, last_stamp, new_stamp, cache_size,mergeWriteMS);
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -21,6 +21,9 @@
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h" #include "Util/NoticeCenter.h"
#include "Util/List.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
#include "Extension/Track.h" #include "Extension/Track.h"
#include "Record/Recorder.h" #include "Record/Recorder.h"
@ -153,6 +156,114 @@ private:
static recursive_mutex g_mtxMediaSrc; static recursive_mutex g_mtxMediaSrc;
}; };
///缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy(bool is_audio) {
_is_audio = is_audio;
};
~FlushPolicy() = default;
uint32_t getStamp(const RtpPacket::Ptr &packet) {
return packet->timeStamp;
}
uint32_t getStamp(const RtmpPacket::Ptr &packet) {
return packet->timeStamp;
}
bool isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size);
private:
bool _is_audio;
};
/// 视频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class VideoPacketCache {
public:
VideoPacketCache() : _policy(true) {
_cache = std::make_shared<packet_list>();
}
virtual ~VideoPacketCache() = default;
void inputVideo(const std::shared_ptr<packet> &rtp, bool key_pos) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void onFlushVideo(std::shared_ptr<packet_list> &, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushVideo(_cache, _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
bool _key_pos = false;
};
/// 音频频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class AudioPacketCache {
public:
AudioPacketCache() : _policy(false) {
_cache = std::make_shared<packet_list>();
}
virtual ~AudioPacketCache() = default;
void inputAudio(const std::shared_ptr<packet> &rtp) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
}
virtual void onFlushAudio(std::shared_ptr<packet_list> &) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushAudio(_cache);
_cache = std::make_shared<packet_list>();
}
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
};
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -67,6 +67,7 @@ const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay";
const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp"; const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp";
const string kPublishToHls = GENERAL_FIELD"publishToHls"; const string kPublishToHls = GENERAL_FIELD"publishToHls";
const string kPublishToMP4 = GENERAL_FIELD"publishToMP4"; const string kPublishToMP4 = GENERAL_FIELD"publishToMP4";
const string kMergeWriteMS = GENERAL_FIELD"mergeWriteMS";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kFlowThreshold] = 1024;
@ -79,6 +80,7 @@ onceToken token([](){
mINI::Instance()[kPublishToRtxp] = 1; mINI::Instance()[kPublishToRtxp] = 1;
mINI::Instance()[kPublishToHls] = 1; mINI::Instance()[kPublishToHls] = 1;
mINI::Instance()[kPublishToMP4] = 0; mINI::Instance()[kPublishToMP4] = 0;
mINI::Instance()[kMergeWriteMS] = 300;
},nullptr); },nullptr);
}//namespace General }//namespace General

View File

@ -173,6 +173,9 @@ extern const string kPublishToRtxp ;
extern const string kPublishToHls ; extern const string kPublishToHls ;
//是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置 //是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置
extern const string kPublishToMP4 ; extern const string kPublishToMP4 ;
//合并写缓存大小(单位毫秒)合并写指服务器缓存一定的数据后才会一次性写入socket这样能提高性能但是会提高延时
//在开启低延时模式后,该参数不起作用
extern const string kMergeWriteMS ;
}//namespace General }//namespace General

View File

@ -615,14 +615,17 @@ void HttpSession::setSocketFlags(){
} }
} }
void HttpSession::onWrite(const Buffer::Ptr &buffer) { void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) {
if(flush){
//需要flush那么一次刷新缓存
HttpSession::setSendFlushFlag(true);
}
_ticker.resetTime(); _ticker.resetTime();
if(!_flv_over_websocket){ if(!_flv_over_websocket){
_ui64TotalBytes += buffer->size(); _ui64TotalBytes += buffer->size();
send(buffer); send(buffer);
return; }else{
}
WebSocketHeader header; WebSocketHeader header;
header._fin = true; header._fin = true;
header._reserved = 0; header._reserved = 0;
@ -631,6 +634,12 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) {
WebSocketSplitter::encode(header,buffer); WebSocketSplitter::encode(header,buffer);
} }
if(flush){
//本次刷新缓存后,下次不用刷新缓存
HttpSession::setSendFlushFlag(false);
}
}
void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){ void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){
_ui64TotalBytes += buffer->size(); _ui64TotalBytes += buffer->size();
send(buffer); send(buffer);

View File

@ -49,7 +49,7 @@ public:
static string urlDecode(const string &str); static string urlDecode(const string &str);
protected: protected:
//FlvMuxer override //FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ; void onWrite(const Buffer::Ptr &data, bool flush) override ;
void onDetach() override; void onDetach() override;
std::shared_ptr<FlvMuxer> getSharedPtr() override; std::shared_ptr<FlvMuxer> getSharedPtr() override;

View File

@ -50,12 +50,17 @@ void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &
} }
strongSelf->onDetach(); strongSelf->onDetach();
}); });
_ring_reader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->onWriteRtmp(pkt);
int i = 0;
int size = pkt->size();
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
strongSelf->onWriteRtmp(rtmp, ++i == size);
});
}); });
} }
@ -84,11 +89,11 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
} }
//flv header //flv header
onWrite(std::make_shared<BufferRaw>(flv_file_header, sizeof(flv_file_header) - 1)); onWrite(std::make_shared<BufferRaw>(flv_file_header, sizeof(flv_file_header) - 1), false);
auto size = htonl(0); auto size = htonl(0);
//PreviousTagSize0 Always 0 //PreviousTagSize0 Always 0
onWrite(std::make_shared<BufferRaw>((char *)&size,4)); onWrite(std::make_shared<BufferRaw>((char *)&size,4), false);
auto &metadata = mediaSrc->getMetaData(); auto &metadata = mediaSrc->getMetaData();
@ -97,12 +102,12 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
//其实metadata没什么用有些推流器不产生metadata //其实metadata没什么用有些推流器不产生metadata
AMFEncoder invoke; AMFEncoder invoke;
invoke << "onMetaData" << metadata; invoke << "onMetaData" << metadata;
onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0); onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0, false);
} }
//config frame //config frame
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
onWriteRtmp(pkt); onWriteRtmp(pkt, true);
}); });
} }
@ -125,29 +130,29 @@ public:
#pragma pack(pop) #pragma pack(pop)
#endif // defined(_WIN32) #endif // defined(_WIN32)
void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) { void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp , bool flush) {
onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp); onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp, flush);
} }
void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp) { void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush) {
RtmpTagHeader header; RtmpTagHeader header;
header.type = ui8Type; header.type = ui8Type;
set_be24(header.data_size, buffer->size()); set_be24(header.data_size, buffer->size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
//tag header //tag header
onWrite(std::make_shared<BufferRaw>((char *)&header, sizeof(header))); onWrite(std::make_shared<BufferRaw>((char *)&header, sizeof(header)), false);
//tag data //tag data
onWrite(buffer); onWrite(buffer, false);
auto size = htonl((buffer->size() + sizeof(header))); auto size = htonl((buffer->size() + sizeof(header)));
//PreviousTagSize //PreviousTagSize
onWrite(std::make_shared<BufferRaw>((char *)&size,4)); onWrite(std::make_shared<BufferRaw>((char *)&size,4), flush);
} }
void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) { void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush) {
int64_t dts_out; int64_t dts_out;
_stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out); _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out);
onWriteFlvTag(pkt, dts_out); onWriteFlvTag(pkt, dts_out,flush);
} }
void FlvMuxer::stop() { void FlvMuxer::stop() {
@ -187,7 +192,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSour
start(poller,media); start(poller,media);
} }
void FlvRecorder::onWrite(const Buffer::Ptr &data) { void FlvRecorder::onWrite(const Buffer::Ptr &data, bool flush) {
lock_guard<recursive_mutex> lck(_file_mtx); lock_guard<recursive_mutex> lck(_file_mtx);
if(_file){ if(_file){
fwrite(data->data(),data->size(),1,_file.get()); fwrite(data->data(),data->size(),1,_file.get());

View File

@ -27,14 +27,14 @@ public:
void stop(); void stop();
protected: protected:
void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media); void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media);
virtual void onWrite(const Buffer::Ptr &data) = 0; virtual void onWrite(const Buffer::Ptr &data, bool flush) = 0;
virtual void onDetach() = 0; virtual void onDetach() = 0;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() = 0; virtual std::shared_ptr<FlvMuxer> getSharedPtr() = 0;
private: private:
void onWriteFlvHeader(const RtmpMediaSource::Ptr &media); void onWriteFlvHeader(const RtmpMediaSource::Ptr &media);
void onWriteRtmp(const RtmpPacket::Ptr &pkt); void onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush);
void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp); void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp, bool flush);
void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp); void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush);
private: private:
RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; RtmpMediaSource::RingType::RingReader::Ptr _ring_reader;
//时间戳修整器 //时间戳修整器
@ -50,7 +50,7 @@ public:
void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path); void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path);
void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path); void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path);
private: private:
virtual void onWrite(const Buffer::Ptr &data) override ; virtual void onWrite(const Buffer::Ptr &data, bool flush) override ;
virtual void onDetach() override; virtual void onDetach() override;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() override; virtual std::shared_ptr<FlvMuxer> getSharedPtr() override;
private: private:

View File

@ -33,6 +33,9 @@ using namespace toolkit;
#define RTMP_GOP_SIZE 512 #define RTMP_GOP_SIZE 512
namespace mediakit { namespace mediakit {
typedef VideoPacketCache<RtmpPacket> RtmpVideoCache;
typedef AudioPacketCache<RtmpPacket> RtmpAudioCache;
/** /**
* rtmp媒体源的数据抽象 * rtmp媒体源的数据抽象
* rtmp有关键的三要素metadataconfig帧 * rtmp有关键的三要素metadataconfig帧
@ -40,10 +43,11 @@ namespace mediakit {
* rtmp推流rtmp服务器就很简单了 * rtmp推流rtmp服务器就很简单了
* rtmp推拉流协议中metadataconfig帧 * rtmp推拉流协议中metadataconfig帧
*/ */
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr> { class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr>, public RtmpVideoCache, public RtmpAudioCache{
public: public:
typedef std::shared_ptr<RtmpMediaSource> Ptr; typedef std::shared_ptr<RtmpMediaSource> Ptr;
typedef RingBuffer<RtmpPacket::Ptr> RingType; typedef std::shared_ptr<List<RtmpPacket::Ptr> > RingDataType;
typedef RingBuffer<RingDataType> RingType;
/** /**
* *
@ -122,6 +126,9 @@ public:
return; return;
} }
//保存当前时间戳
_track_stamps_map[pkt->typeId] = pkt->timeStamp;
if (!_ring) { if (!_ring) {
weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this()); weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
@ -142,9 +149,12 @@ public:
regist(); regist();
} }
} }
_track_stamps_map[pkt->typeId] = pkt->timeStamp;
//不存在视频为了减少缓存延时那么关闭GOP缓存 if(pkt->typeId == MSG_VIDEO){
_ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true); RtmpVideoCache::inputVideo(pkt, key);
}else{
RtmpAudioCache::inputAudio(pkt);
}
} }
/** /**
@ -163,6 +173,25 @@ public:
} }
private: private:
/**
* flush时间戳相同的视频rtmp包时触发该函数
* @param rtmp_list rtmp包列表
* @param key_pos
*/
void onFlushVideo(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list, bool key_pos) override {
_ring->write(rtmp_list, key_pos);
}
/**
* flush一定数量的音频rtmp包时触发该函数
* @param rtmp_list rtmp包列表
*/
void onFlushAudio(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list) override{
//只有音频的话就不存在gop缓存的意义
_ring->write(rtmp_list, !_have_video);
}
/** /**
* *
*/ */
@ -177,7 +206,7 @@ private:
bool _have_video = false; bool _have_video = false;
mutable recursive_mutex _mtx; mutable recursive_mutex _mtx;
AMFValue _metadata; AMFValue _metadata;
RingBuffer<RtmpPacket::Ptr>::Ptr _ring; RingType::Ptr _ring;
unordered_map<int, uint32_t> _track_stamps_map; unordered_map<int, uint32_t> _track_stamps_map;
unordered_map<int, RtmpPacket::Ptr> _config_frame_map; unordered_map<int, RtmpPacket::Ptr> _config_frame_map;
}; };

View File

@ -200,12 +200,21 @@ inline void RtmpPusher::send_metaData(){
_pRtmpReader = src->getRing()->attach(getPoller()); _pRtmpReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ _pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId);
int i = 0;
int size = pkt->size();
strongSelf->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){
strongSelf->setSendFlushFlag(true);
}
strongSelf->sendRtmp(rtmp->typeId, strongSelf->_ui32StreamId, rtmp, rtmp->timeStamp, rtmp->chunkId);
});
}); });
_pRtmpReader->setDetachCB([weakSelf](){ _pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();

View File

@ -272,12 +272,23 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
_pRingReader = src->getRing()->attach(getPoller()); _pRingReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { _pRingReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->onSendMedia(pkt); if(strongSelf->_paused){
return;
}
int i = 0;
int size = pkt->size();
strongSelf->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){
strongSelf->setSendFlushFlag(true);
}
strongSelf->onSendMedia(rtmp);
});
}); });
_pRingReader->setDetachCB([weakSelf]() { _pRingReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
@ -394,23 +405,8 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) {
status.set("description", paused ? "Paused stream." : "Unpaused stream."); status.set("description", paused ? "Paused stream." : "Unpaused stream.");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
//streamBegin //streamBegin
sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA);
STREAM_MEDIA); _paused = paused;
if (!_pRingReader) {
throw std::runtime_error("Rtmp not started yet!");
}
if (paused) {
_pRingReader->setReadCB(nullptr);
} else {
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
}
} }
void RtmpSession::setMetaData(AMFDecoder &dec) { void RtmpSession::setMetaData(AMFDecoder &dec) {

View File

@ -80,13 +80,14 @@ private:
double _dNowReqID = 0; double _dNowReqID = 0;
bool _set_meta_data = false; bool _set_meta_data = false;
Ticker _ticker;//数据接收时间 Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader; RtmpMediaSource::RingType::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc; std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc; std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
//时间戳修整器 //时间戳修整器
Stamp _stamp[2]; Stamp _stamp[2];
//消耗的总流量 //消耗的总流量
uint64_t _ui64TotalBytes = 0; uint64_t _ui64TotalBytes = 0;
bool _paused = false;
}; };

View File

@ -30,92 +30,8 @@ using namespace toolkit;
#define RTP_GOP_SIZE 512 #define RTP_GOP_SIZE 512
namespace mediakit { namespace mediakit {
class RtpVideoCache { typedef VideoPacketCache<RtpPacket> RtpVideoCache;
public: typedef AudioPacketCache<RtpPacket> RtpAudioCache;
RtpVideoCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpVideoCache() = default;
void inputVideoRtp(const RtpPacket::Ptr &rtp, bool key_pos) {
if (_last_rtp_stamp != rtp->timeStamp) {
//时间戳发生变化了
flushAll();
} else if (_cache->size() > RTP_GOP_SIZE) {
//这个逻辑用于避免时间戳异常的流导致的内存暴增问题
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushVideoRtp(_cache, _key_pos);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
_key_pos = false;
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
bool _key_pos = false;
};
class RtpAudioCache {
public:
RtpAudioCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpAudioCache() = default;
void inputAudioRtp(const RtpPacket::Ptr &rtp) {
if (rtp->timeStamp > _last_rtp_stamp + 100) {
//累积了100ms的音频数据
flushAll();
} else if (_cache->size() > 10) {
//或者audio rtp缓存超过10个
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
}
virtual void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushAudioRtp(_cache);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
};
/** /**
* rtsp媒体源的数据抽象 * rtsp媒体源的数据抽象
@ -261,9 +177,9 @@ public:
} }
if(rtp->type == TrackVideo){ if(rtp->type == TrackVideo){
RtpVideoCache::inputVideoRtp(rtp, keyPos); RtpVideoCache::inputVideo(rtp, keyPos);
}else{ }else{
RtpAudioCache::inputAudioRtp(rtp); RtpAudioCache::inputAudio(rtp);
} }
} }
@ -274,7 +190,7 @@ private:
* @param rtp_list rtp包列表 * @param rtp_list rtp包列表
* @param key_pos * @param key_pos
*/ */
void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override { void onFlushVideo(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
_ring->write(rtp_list, key_pos); _ring->write(rtp_list, key_pos);
} }
@ -282,7 +198,7 @@ private:
* flush一定数量的音频rtp包时触发该函数 * flush一定数量的音频rtp包时触发该函数
* @param rtp_list rtp包列表 * @param rtp_list rtp包列表
*/ */
void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{ void onFlushAudio(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{
//只有音频的话就不存在gop缓存的意义 //只有音频的话就不存在gop缓存的意义
_ring->write(rtp_list, !_have_video); _ring->write(rtp_list, !_have_video);
} }