diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index f922d96f..dd8928dd 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit f922d96f98108186e8d9bb9b468c83c0b37d9558 +Subproject commit dd8928ddaecb71cbb5da447dbaf5af29b24b2e7d diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 8dd0c929..15770a81 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -28,152 +28,164 @@ #include "RtpReceiver.h" #define POP_HEAD(trackidx) \ - auto it = _amapRtpSort[trackidx].begin(); \ + auto it = _rtp_sort_cache_map[trackidx].begin(); \ onRtpSorted(it->second, trackidx); \ - _amapRtpSort[trackidx].erase(it); + _rtp_sort_cache_map[trackidx].erase(it); -# define AV_RB16(x) \ +#define AV_RB16(x) \ ((((const uint8_t*)(x))[0] << 8) | \ ((const uint8_t*)(x))[1]) +#define RTP_MAX_SIZE (10 * 1024) namespace mediakit { RtpReceiver::RtpReceiver() {} RtpReceiver::~RtpReceiver() {} -bool RtpReceiver::handleOneRtp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen) { - auto pt_ptr=_pktPool.obtain(); - auto &rtppt=*pt_ptr; - auto length = uiLen + 4; +bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { + auto rtp_ptr = _rtp_pool.obtain(); + auto &rtp = *rtp_ptr; + auto length = rtp_raw_len + 4; - rtppt.interleaved = track->_interleaved; - rtppt.mark = pucData[1] >> 7; - rtppt.PT = pucData[1] & 0x7F; + rtp.interleaved = track->_interleaved; + rtp.mark = rtp_raw_ptr[1] >> 7; + rtp.PT = rtp_raw_ptr[1] & 0x7F; //序列号 - memcpy(&rtppt.sequence,pucData+2,2);//内存对齐 - rtppt.sequence = ntohs(rtppt.sequence); + memcpy(&rtp.sequence,rtp_raw_ptr+2,2);//内存对齐 + rtp.sequence = ntohs(rtp.sequence); //时间戳 - memcpy(&rtppt.timeStamp, pucData+4, 4);//内存对齐 + memcpy(&rtp.timeStamp, rtp_raw_ptr+4, 4);//内存对齐 if(!track->_samplerate){ + //无法把时间戳转换成毫秒 return false; } //时间戳转换成毫秒 - rtppt.timeStamp = ntohl(rtppt.timeStamp) * 1000LL / track->_samplerate; + rtp.timeStamp = ntohl(rtp.timeStamp) * 1000LL / track->_samplerate; //ssrc - memcpy(&rtppt.ssrc,pucData+8,4);//内存对齐 - rtppt.ssrc = ntohl(rtppt.ssrc); - rtppt.type = track->_type; - if (track->_ssrc == 0) { - track->_ssrc = rtppt.ssrc; - //保存SSRC - } else if (track->_ssrc != rtppt.ssrc) { - //ssrc错误 - WarnL << "ssrc错误:" << rtppt.ssrc << " != " << track->_ssrc; - if (_aui32SsrcErrorCnt[iTrackidx]++ > 10) { - //ssrc切换后清除老数据 - WarnL << "ssrc更换:" << track->_ssrc << " -> " << rtppt.ssrc; - _amapRtpSort[iTrackidx].clear(); - track->_ssrc = rtppt.ssrc; - } - return false; - } - _aui32SsrcErrorCnt[iTrackidx] = 0; + memcpy(&rtp.ssrc,rtp_raw_ptr+8,4);//内存对齐 + rtp.ssrc = ntohl(rtp.ssrc); + rtp.type = track->_type; - //获取rtp中媒体数据偏移量 - rtppt.offset = 12 + 4; - int csrc = pucData[0] & 0x0f; - int ext = pucData[0] & 0x10; - rtppt.offset += 4 * csrc; - if (ext) { - if(uiLen < rtppt.offset){ + if (track->_ssrc != rtp.ssrc) { + if (track->_ssrc == 0) { + //保存SSRC至track对象 + track->_ssrc = rtp.ssrc; + }else{ + //ssrc错误 + WarnL << "ssrc错误:" << rtp.ssrc << " != " << track->_ssrc; + if (_ssrc_err_count[track_index]++ > 10) { + //ssrc切换后清除老数据 + WarnL << "ssrc更换:" << track->_ssrc << " -> " << rtp.ssrc; + _rtp_sort_cache_map[track_index].clear(); + track->_ssrc = rtp.ssrc; + } return false; } - /* calculate the header extension length (stored as number of 32-bit words) */ - ext = (AV_RB16(pucData + rtppt.offset - 2) + 1) << 2; - rtppt.offset += ext; } - if(length - rtppt.offset <= 0){ - WarnL << "无有效负载的rtp包:" << length << "<=" << (int)rtppt.offset; + //ssrc匹配正确,不匹配计数清零 + _ssrc_err_count[track_index] = 0; + + //获取rtp中媒体数据偏移量 + rtp.offset = 12 + 4; + int csrc = rtp_raw_ptr[0] & 0x0f; + int ext = rtp_raw_ptr[0] & 0x10; + rtp.offset += 4 * csrc; + if (ext && rtp_raw_len >= rtp.offset) { + /* calculate the header extension length (stored as number of 32-bit words) */ + ext = (AV_RB16(rtp_raw_ptr + rtp.offset - 2) + 1) << 2; + rtp.offset += ext; + } + + if(length <= rtp.offset){ + WarnL << "无有效负载的rtp包:" << length << "<=" << (int)rtp.offset; return false; } - rtppt.setCapacity(length); - rtppt.setSize(length); - uint8_t *payload_ptr = (uint8_t *)rtppt.data(); + if(length > RTP_MAX_SIZE){ + WarnL << "超大的rtp包:" << length << ">" << RTP_MAX_SIZE; + return false; + } + + //设置rtp负载长度 + rtp.setCapacity(length); + rtp.setSize(length); + uint8_t *payload_ptr = (uint8_t *)rtp.data(); payload_ptr[0] = '$'; - payload_ptr[1] = rtppt.interleaved; - payload_ptr[2] = uiLen >> 8; - payload_ptr[3] = (uiLen & 0x00FF); + payload_ptr[1] = rtp.interleaved; + payload_ptr[2] = rtp_raw_len >> 8; + payload_ptr[3] = (rtp_raw_len & 0x00FF); //拷贝rtp负载 - memcpy(payload_ptr + 4, pucData, uiLen); - - /////////////////////////////////RTP排序逻辑/////////////////////////////////// - if(rtppt.sequence != _aui16LastSeq[iTrackidx] + 1 && _aui16LastSeq[iTrackidx] != 0){ - //包乱序或丢包 - _aui32SeqOkCnt[iTrackidx] = 0; - _abSortStarted[iTrackidx] = true; -// WarnL << "包乱序或丢包:" << iTrackidx <<" " << rtppt.sequence << " " << _aui16LastSeq[iTrackidx]; - if(_aui16LastSeq[iTrackidx] > rtppt.sequence && _aui16LastSeq[iTrackidx] - rtppt.sequence > 0xFF){ - //sequence回环,清空所有排序缓存 - while (_amapRtpSort[iTrackidx].size()) { - POP_HEAD(iTrackidx) - } - ++_clcyeCount[iTrackidx]; - } - }else{ - //正确序列的包 - _aui32SeqOkCnt[iTrackidx]++; - } - _aui16LastSeq[iTrackidx] = rtppt.sequence; - - //开始排序缓存 - if (_abSortStarted[iTrackidx]) { - _amapRtpSort[iTrackidx].emplace(rtppt.sequence, pt_ptr); - GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount); - GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount); - if (_aui32SeqOkCnt[iTrackidx] >= clearCount) { - //网络环境改善,需要清空排序缓存 - _aui32SeqOkCnt[iTrackidx] = 0; - _abSortStarted[iTrackidx] = false; - while (_amapRtpSort[iTrackidx].size()) { - POP_HEAD(iTrackidx) - } - } else if (_amapRtpSort[iTrackidx].size() >= maxRtpCount) { - //排序缓存溢出 - POP_HEAD(iTrackidx) - } - }else{ - //正确序列 - onRtpSorted(pt_ptr, iTrackidx); - } - ////////////////////////////////////////////////////////////////////////////////// + memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len); + //排序rtp + sortRtp(rtp_ptr,track_index); return true; } -void RtpReceiver::clear() { - CLEAR_ARR(_aui16LastSeq) - CLEAR_ARR(_aui32SsrcErrorCnt) - CLEAR_ARR(_aui32SeqOkCnt) - CLEAR_ARR(_abSortStarted) - CLEAR_ARR(_clcyeCount) +void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ + if(rtp->sequence != _last_seq[track_index] + 1 && _last_seq[track_index] != 0){ + //包乱序或丢包 + _seq_ok_count[track_index] = 0; + _sort_started[track_index] = true; + if(_last_seq[track_index] > rtp->sequence && _last_seq[track_index] - rtp->sequence > 0xFF){ + //sequence回环,清空所有排序缓存 + while (_rtp_sort_cache_map[track_index].size()) { + POP_HEAD(track_index) + } + ++_seq_cycle_count[track_index]; + } + }else{ + //正确序列的包 + _seq_ok_count[track_index]++; + } - _amapRtpSort[0].clear(); - _amapRtpSort[1].clear(); + _last_seq[track_index] = rtp->sequence; + + //开始排序缓存 + if (_sort_started[track_index]) { + _rtp_sort_cache_map[track_index].emplace(rtp->sequence, rtp); + GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount); + GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount); + if (_seq_ok_count[track_index] >= clearCount) { + //网络环境改善,需要清空排序缓存 + _seq_ok_count[track_index] = 0; + _sort_started[track_index] = false; + while (_rtp_sort_cache_map[track_index].size()) { + POP_HEAD(track_index) + } + } else if (_rtp_sort_cache_map[track_index].size() >= maxRtpCount) { + //排序缓存溢出 + POP_HEAD(track_index) + } + }else{ + //正确序列 + onRtpSorted(rtp, track_index); + } +} + +void RtpReceiver::clear() { + CLEAR_ARR(_last_seq) + CLEAR_ARR(_ssrc_err_count) + CLEAR_ARR(_seq_ok_count) + CLEAR_ARR(_sort_started) + CLEAR_ARR(_seq_cycle_count) + + _rtp_sort_cache_map[0].clear(); + _rtp_sort_cache_map[1].clear(); } void RtpReceiver::setPoolSize(int size) { - _pktPool.setSize(size); + _rtp_pool.setSize(size); } int RtpReceiver::getJitterSize(int iTrackidx){ - return _amapRtpSort[iTrackidx].size(); + return _rtp_sort_cache_map[iTrackidx].size(); } int RtpReceiver::getCycleCount(int iTrackidx){ - return _clcyeCount[iTrackidx]; + return _seq_cycle_count[iTrackidx]; } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 06d1b45b..9bea72cb 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -48,33 +48,41 @@ protected: /** * 输入数据指针生成并排序rtp包 - * @param iTrackidx track下标索引 + * @param track_index track下标索引 * @param track sdp track相关信息 - * @param pucData rtp数据指针 - * @param uiLen rtp数据指针长度 + * @param rtp_raw_ptr rtp数据指针 + * @param rtp_raw_len rtp数据指针长度 * @return 解析成功返回true */ - bool handleOneRtp(int iTrackidx,SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); + bool handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len); /** * rtp数据包排序后输出 * @param rtppt rtp数据包 * @param trackidx track索引 */ - virtual void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){} + virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int trackidx){} void clear(); void setPoolSize(int size); int getJitterSize(int iTrackidx); int getCycleCount(int iTrackidx); private: - uint32_t _aui32SsrcErrorCnt[2] = { 0, 0 }; - /* RTP包排序所用参数 */ - uint16_t _aui16LastSeq[2] = { 0 , 0 }; - uint32_t _aui32SeqOkCnt[2] = { 0 , 0}; - uint32_t _clcyeCount[2] = { 0 , 0}; - bool _abSortStarted[2] = { 0 , 0}; - map _amapRtpSort[2]; - RtspMediaSource::PoolType _pktPool; + void sortRtp(const RtpPacket::Ptr &rtp , int track_index); +private: + //ssrc不匹配计数 + uint32_t _ssrc_err_count[2] = { 0, 0 }; + //上次seq + uint16_t _last_seq[2] = { 0 , 0 }; + //seq连续次数计数 + uint32_t _seq_ok_count[2] = { 0 , 0}; + //seq回环次数计数 + uint32_t _seq_cycle_count[2] = { 0 , 0}; + //是否开始seq排序 + bool _sort_started[2] = { 0 , 0}; + //rtp排序缓存,根据seq排序 + map _rtp_sort_cache_map[2]; + //rtp循环池 + RtspMediaSource::PoolType _rtp_pool; }; }//namespace mediakit