rtp使用ntp时间戳作为时间戳,用于实现rtsp音视频同步

This commit is contained in:
ziyue 2021-07-12 21:18:22 +08:00
parent eba3758b30
commit 298f6e3864
17 changed files with 184 additions and 29 deletions

View File

@ -35,7 +35,8 @@ int64_t DeltaStamp::deltaStamp(int64_t stamp) {
//时间戳增量为负,说明时间戳回环了或回退了
_last_stamp = stamp;
return 0;
//如果时间戳回退不多,那么返回负值
return -ret < MAX_CTS ? ret : 0;
}
void Stamp::setPlayBack(bool playback) {
@ -215,4 +216,32 @@ bool DtsGenerator::getDts_l(uint32_t pts, uint32_t &dts){
return false;
}
void NtpStamp::setNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate, uint64_t ntp_stamp_ms) {
_rtp_stamp_ms = uint64_t(rtp_stamp) * 1000 / sample_rate;
_ntp_stamp_ms = ntp_stamp_ms;
}
uint64_t NtpStamp::getNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate) {
uint64_t rtp_stamp_ms = uint64_t(rtp_stamp) * 1000 / sample_rate;
if (!_rtp_stamp_ms && !_ntp_stamp_ms) {
return rtp_stamp_ms;
}
if (rtp_stamp_ms > _rtp_stamp_ms) {
//时间戳正常增长
_last_ret = _ntp_stamp_ms + (rtp_stamp_ms - _rtp_stamp_ms);
return _last_ret;
}
if (_rtp_stamp_ms - rtp_stamp_ms < 10 * 1000) {
//小于10秒的时间戳回退说明收到rtp乱序了
return _ntp_stamp_ms - (_rtp_stamp_ms - rtp_stamp_ms);
}
uint64_t max_rtp_ms = uint64_t(UINT32_MAX) * 1000 / sample_rate;
if (rtp_stamp_ms < 60 * 1000 && _rtp_stamp_ms > max_rtp_ms - 60 * 1000) {
//确定是时间戳溢出
return _ntp_stamp_ms + rtp_stamp_ms + (max_rtp_ms - _rtp_stamp_ms);
}
//不明原因的时间戳回退,直接返回上次值
return _last_ret;
}
}//namespace mediakit

View File

@ -114,6 +114,20 @@ private:
set<uint32_t> _pts_sorter;
};
class NtpStamp {
public:
NtpStamp() = default;
~NtpStamp() = default;
void setNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate, uint64_t ntp_stamp_ms);
uint64_t getNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate);
private:
uint64_t _rtp_stamp_ms = 0;
uint64_t _ntp_stamp_ms = 0;
uint64_t _last_ret = 0;
};
}//namespace mediakit
#endif //ZLMEDIAKIT_STAMP_H

View File

@ -22,7 +22,7 @@ RtcpContext::RtcpContext(bool is_receiver) {
_is_receiver = is_receiver;
}
void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, uint32_t sample_rate, size_t bytes) {
void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t sample_rate, size_t bytes) {
if (_is_receiver) {
//接收者才做复杂的统计运算
auto sys_stamp = getCurrentMillisecond();
@ -65,6 +65,7 @@ void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, uint32_t sample_rate, size
++_packets;
_bytes += bytes;
_last_rtp_stamp = stamp;
_last_ntp_stamp_ms = ntp_stamp_ms;
}
void RtcpContext::onRtcp(RtcpHeader *rtcp) {
@ -154,7 +155,7 @@ Buffer::Ptr RtcpContext::createRtcpSR(uint32_t rtcp_ssrc) {
throw std::runtime_error("rtp接收者尝试发送sr包");
}
auto rtcp = RtcpSR::create(0);
rtcp->setNtpStamp(getCurrentMillisecond(true));
rtcp->setNtpStamp(_last_ntp_stamp_ms);
rtcp->rtpts = htonl(_last_rtp_stamp);
rtcp->ssrc = htonl(rtcp_ssrc);
rtcp->packet_count = htonl((uint32_t) _packets);

View File

@ -30,10 +30,11 @@ public:
* rtp时调用
* @param seq rtp的seq
* @param stamp rtp的时间戳()
* @param ntp_stamp_ms ntp时间戳
* @param rtp rtp时间戳采样率90000
* @param bytes rtp数据长度
*/
void onRtp(uint16_t seq, uint32_t stamp, uint32_t sample_rate, size_t bytes);
void onRtp(uint16_t seq, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t sample_rate, size_t bytes);
/**
* sr rtcp包
@ -110,6 +111,7 @@ private:
uint16_t _last_rtp_seq = 0;
//上次的rtp时间戳,毫秒
uint32_t _last_rtp_stamp = 0;
uint64_t _last_ntp_stamp_ms = 0;
//上次的rtp的系统时间戳(毫秒)用于统计抖动
uint64_t _last_rtp_sys_stamp = 0;
//上次统计的丢包总数

View File

@ -35,7 +35,7 @@ public:
void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){
//统计rtp接受情况用于发送rr包
auto header = (RtpHeader *) buf->data();
onRtp(ntohs(header->seq), ntohl(header->stamp), _sample_rate, buf->size());
onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , _sample_rate, buf->size());
sendRtcp(ntohl(header->ssrc), addr, addr_len);
}

View File

@ -91,12 +91,19 @@ bool RtpTrack::inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t le
//拷贝rtp
memcpy(&data[4], ptr, len);
//设置ntp时间戳
rtp->ntp_stamp = _ntp_stamp.getNtpStamp(ntohl(rtp->getHeader()->stamp), sample_rate);
onBeforeRtpSorted(rtp);
auto seq = rtp->getSeq();
sortPacket(seq, std::move(rtp));
return true;
}
void RtpTrack::setNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate, uint64_t ntp_stamp_ms){
_ntp_stamp.setNtpStamp(rtp_stamp, sample_rate, ntp_stamp_ms);
}
////////////////////////////////////////////////////////////////////////////////////
void RtpTrackImp::setOnSorted(OnSorted cb) {

View File

@ -16,6 +16,7 @@
#include <memory>
#include "RtpCodec.h"
#include "RtspMediaSource.h"
#include "Common/Stamp.h"
using namespace std;
using namespace toolkit;
@ -175,6 +176,7 @@ public:
void clear();
uint32_t getSSRC() const;
bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len);
void setNtpStamp(uint32_t rtp_stamp, uint32_t sample_rate, uint64_t ntp_stamp_ms);
protected:
virtual void onRtpSorted(RtpPacket::Ptr rtp) {}
@ -183,6 +185,7 @@ protected:
private:
uint32_t _ssrc = 0;
Ticker _ssrc_alive;
NtpStamp _ntp_stamp;
};
class RtpTrackImp : public RtpTrack{
@ -236,6 +239,17 @@ public:
return _track[index].inputRtp(type, sample_rate, ptr, len);
}
/**
* ntp时间戳rtcp sender report时设置
* @param index track下标索引
* @param rtp_stamp rtp时间戳
* @param sample_rate
* @param ntp_stamp_ms ntp时间戳
*/
void setNtpStamp(int index, uint32_t rtp_stamp, uint32_t sample_rate, uint64_t ntp_stamp_ms){
_track[index].setNtpStamp(rtp_stamp, sample_rate, ntp_stamp_ms);
}
void clear() {
for (auto &track : _track) {
track.clear();

View File

@ -525,32 +525,40 @@ string RtpHeader::dumpString(size_t rtp_size) const{
///////////////////////////////////////////////////////////////////////
RtpHeader* RtpPacket::getHeader(){
RtpHeader *RtpPacket::getHeader() {
//需除去rtcp over tcp 4个字节长度
return (RtpHeader*)(data() + RtpPacket::kRtpTcpHeaderSize);
return (RtpHeader *) (data() + RtpPacket::kRtpTcpHeaderSize);
}
string RtpPacket::dumpString() const{
const RtpHeader *RtpPacket::getHeader() const {
return (RtpHeader *) (data() + RtpPacket::kRtpTcpHeaderSize);
}
string RtpPacket::dumpString() const {
return ((RtpPacket *) this)->getHeader()->dumpString(size() - RtpPacket::kRtpTcpHeaderSize);
}
uint16_t RtpPacket::getSeq(){
uint16_t RtpPacket::getSeq() const {
return ntohs(getHeader()->seq);
}
uint32_t RtpPacket::getStampMS(){
return ntohl(getHeader()->stamp) * uint64_t(1000) / sample_rate;
uint32_t RtpPacket::getStamp() const {
return ntohl(getHeader()->stamp);
}
uint32_t RtpPacket::getSSRC(){
uint32_t RtpPacket::getStampMS() const {
return ntp_stamp & 0xFFFFFFFF;
}
uint32_t RtpPacket::getSSRC() const {
return ntohl(getHeader()->ssrc);
}
uint8_t* RtpPacket::getPayload(){
uint8_t *RtpPacket::getPayload() {
return getHeader()->getPayloadData();
}
size_t RtpPacket::getPayloadSize(){
size_t RtpPacket::getPayloadSize() const {
//需除去rtcp over tcp 4个字节长度
return getHeader()->getPayloadSize(size() - kRtpTcpHeaderSize);
}

View File

@ -154,24 +154,29 @@ public:
//获取rtp头
RtpHeader* getHeader();
const RtpHeader* getHeader() const;
//打印调试信息
string dumpString() const;
//主机字节序的seq
uint16_t getSeq();
uint16_t getSeq() const;
uint32_t getStamp() const;
//主机字节序的时间戳,已经转换为毫秒
uint32_t getStampMS();
uint32_t getStampMS() const;
//主机字节序的ssrc
uint32_t getSSRC();
uint32_t getSSRC() const;
//有效负载跳过csrc、ext
uint8_t* getPayload();
//有效负载长度不包括csrc、ext、padding
size_t getPayloadSize();
size_t getPayloadSize() const;
//音视频类型
TrackType type;
//音频为采样率视频一般为90000
uint32_t sample_rate;
//ntp时间戳
uint64_t ntp_stamp;
static Ptr create();

View File

@ -163,7 +163,7 @@ public:
auto stamp = rtp->getStampMS();
if (track) {
track->_seq = rtp->getSeq();
track->_time_stamp = stamp;
track->_time_stamp = rtp->getStamp() * uint64_t(1000) / rtp->sample_rate;
track->_ssrc = rtp->getSSRC();
}
if (!_ring) {

View File

@ -13,6 +13,36 @@
namespace mediakit {
class RingDelegateHelper : public RingDelegate<RtpPacket::Ptr> {
public:
RingDelegateHelper(RtspMuxer *delegate) {
_delegate = delegate;
}
void onWrite(RtpPacket::Ptr in, bool is_key) override {
_delegate->onRtp(std::move(in), is_key);
}
private:
RtspMuxer *_delegate;
};
void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) {
if (_rtp_stamp[in->type] != in->getHeader()->stamp) {
//rtp时间戳变化才计算ntp节省cpu资源
int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate;
int64_t stamp_ms_inc;
//求rtp时间戳增量
_stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc);
_rtp_stamp[in->type] = in->getHeader()->stamp;
_ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start;
}
//rtp拦截入口此处统一赋值ntp
in->ntp_stamp = _ntp_stamp[in->type];
_rtpRing->write(std::move(in), is_key);
}
RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title){
if(!title){
_sdp = std::make_shared<TitleSdp>()->getSdp();
@ -20,6 +50,9 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title){
_sdp = title->getSdp();
}
_rtpRing = std::make_shared<RtpRing::RingType>();
_rtpInterceptor = std::make_shared<RtpRing::RingType>();
_rtpInterceptor->setDelegate(std::make_shared<RingDelegateHelper>(this));
_ntp_stamp_start = getCurrentMillisecond(true);
}
void RtspMuxer::addTrack(const Track::Ptr &track) {
@ -36,15 +69,23 @@ void RtspMuxer::addTrack(const Track::Ptr &track) {
}
//设置rtp输出环形缓存
encoder->setRtpRing(_rtpRing);
encoder->setRtpRing(_rtpInterceptor);
//添加其sdp
_sdp.append(sdp->getSdp());
trySyncTrack();
}
void RtspMuxer::trySyncTrack() {
if (_encoder[TrackAudio] && _encoder[TrackVideo]) {
//音频时间戳同步于视频,因为音频时间戳被修改后不影响播放
_stamp[TrackAudio].syncTo(_stamp[TrackVideo]);
}
}
void RtspMuxer::inputFrame(const Frame::Ptr &frame) {
auto &encoder = _encoder[frame->getTrackType()];
if(encoder){
if (encoder) {
encoder->inputFrame(frame);
}
}

View File

@ -13,6 +13,7 @@
#include "Extension/Frame.h"
#include "Common/MediaSink.h"
#include "Common/Stamp.h"
#include "RtpCodec.h"
namespace mediakit{
@ -21,7 +22,8 @@ namespace mediakit{
*/
class RtspMuxer : public MediaSinkInterface{
public:
typedef std::shared_ptr<RtspMuxer> Ptr;
friend class RingDelegateHelper;
using Ptr = std::shared_ptr<RtspMuxer>;
/**
*
@ -57,10 +59,21 @@ public:
* track
*/
void resetTracks() override ;
private:
void onRtp(RtpPacket::Ptr in, bool is_key);
void computeNtp(const Frame::Ptr &frame);
void trySyncTrack();
private:
uint32_t _rtp_stamp[TrackMax]{0};
uint64_t _ntp_stamp[TrackMax]{0};
uint64_t _ntp_stamp_start;
string _sdp;
Stamp _stamp[TrackMax];
RtpCodec::Ptr _encoder[TrackMax];
RtpRing::RingType::Ptr _rtpRing;
RtpRing::RingType::Ptr _rtpInterceptor;
};

View File

@ -488,6 +488,11 @@ void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data
auto rtcp_arr = RtcpHeader::loadFromBytes((char *) data, len);
for (auto &rtcp : rtcp_arr) {
_rtcp_context[track_idx]->onRtcp(rtcp);
if ((RtcpType) rtcp->pt == RtcpType::RTCP_SR) {
auto sr = (RtcpSR *) (rtcp);
//设置rtp时间戳与ntp时间戳的对应关系
setNtpStamp(track_idx, sr->rtpts, track->_samplerate, sr->getNtpUnixStampMS());
}
}
}
@ -591,7 +596,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){
auto &rtcp_ctx = _rtcp_context[track_idx];
rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_ticker[track_idx];
if (ticker.elapsedTime() < 3 * 1000) {

View File

@ -360,7 +360,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type);
auto &ticker = _rtcp_send_ticker[track_index];
auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
//send rtcp every 5 second
if (ticker.elapsedTime() > 5 * 1000) {

View File

@ -186,6 +186,11 @@ void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, const char *
auto rtcp_arr = RtcpHeader::loadFromBytes((char *) data, len);
for (auto &rtcp : rtcp_arr) {
_rtcp_context[track_idx]->onRtcp(rtcp);
if ((RtcpType) rtcp->pt == RtcpType::RTCP_SR) {
auto sr = (RtcpSR *) (rtcp);
//设置rtp时间戳与ntp时间戳的对应关系
setNtpStamp(track_idx, sr->rtpts, track->_samplerate, sr->getNtpUnixStampMS());
}
}
}
@ -1126,12 +1131,14 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){
void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type);
auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_tickers[track_index];
//send rtcp every 5 second
if (ticker.elapsedTime() > 5 * 1000) {
if (ticker.elapsedTime() > 5 * 1000 || (_send_sr_rtcp[track_index] && !_push_src)) {
//确保在发送rtp前先发送一次sender report rtcp(用于播放器同步音视频)
ticker.resetTime();
_send_sr_rtcp[track_index] = false;
static auto send_rtcp = [](RtspSession *thiz, int index, Buffer::Ptr ptr) {
if (thiz->_rtp_type == Rtsp::RTP_TCP) {

View File

@ -215,6 +215,7 @@ private:
Ticker _rtcp_send_tickers[2];
//统计rtp并发送rtcp
vector<RtcpContext::Ptr> _rtcp_context;
bool _send_sr_rtcp[2] = {true, true};
};
/**

View File

@ -600,8 +600,6 @@ public:
auto seq = ntohs(rtp->seq);
//统计rtp接受情况便于生成nack rtcp包
_nack_ctx.received(seq);
//统计rtp收到的情况好做rr汇报
_rtcp_context.onRtp(seq, ntohl(rtp->stamp), sample_rate, len);
}
return RtpTrack::inputRtp(type, sample_rate, ptr, len);
}
@ -611,6 +609,14 @@ public:
return _rtcp_context.createRtcpRR(ssrc, getSSRC());
}
protected:
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) override {
//统计rtp收到的情况好做rr汇报
_rtcp_context.onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate,
rtp->size() - RtpPacket::kRtpTcpHeaderSize);
RtpTrackImp::onBeforeRtpSorted(rtp);
}
private:
NackContext _nack_ctx;
RtcpContext _rtcp_context{true};
@ -639,6 +645,8 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
if(!rtp_chn){
WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
} else {
//设置rtp时间戳与ntp时间戳的对应关系
rtp_chn->setNtpStamp(sr->rtpts, track->plan_rtp->sample_rate, sr->getNtpUnixStampMS());
auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp);
sendRtcpPacket(rr->data(), rr->size(), true);
}
@ -845,7 +853,7 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r
}
if (!rtx) {
//统计rtp发送情况好做sr汇报
track->rtcp_context_send->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
track->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
track->nack_list.push_back(rtp);
#if 0
//此处模拟发送丢包