解决rtsp播放器粘包问题

This commit is contained in:
xiongziliang 2018-10-30 10:31:27 +08:00
parent ae1d9371fa
commit 27bc19dd64
4 changed files with 203 additions and 142 deletions

View File

@ -55,11 +55,7 @@ RtspPlayer::RtspPlayer(void){
_pktPool.setSize(64); _pktPool.setSize(64);
} }
RtspPlayer::~RtspPlayer(void) { RtspPlayer::~RtspPlayer(void) {
teardown(); RtspPlayer::teardown();
if (_pucRtpBuf) {
delete[] _pucRtpBuf;
_pucRtpBuf = nullptr;
}
DebugL<<endl; DebugL<<endl;
} }
void RtspPlayer::teardown(){ void RtspPlayer::teardown(){
@ -72,7 +68,6 @@ void RtspPlayer::teardown(){
erase(kRtspRealm); erase(kRtspRealm);
_aTrackInfo.clear(); _aTrackInfo.clear();
_onHandshake = nullptr; _onHandshake = nullptr;
_uiRtpBufLen = 0;
_strSession.clear(); _strSession.clear();
_uiCseq = 1; _uiCseq = 1;
_strContentBase.clear(); _strContentBase.clear();
@ -130,9 +125,7 @@ void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPw
} }
_eType = eType; _eType = eType;
if (_eType == RTP_TCP && !_pucRtpBuf) {
_pucRtpBuf = new uint8_t[RTP_BUF_SIZE];
}
auto ip = FindField(strUrl, "://", "/"); auto ip = FindField(strUrl, "://", "/");
if (!ip.size()) { if (!ip.size()) {
ip = FindField(strUrl, "://", NULL); ip = FindField(strUrl, "://", NULL);
@ -174,45 +167,7 @@ void RtspPlayer::onConnect(const SockException &err){
} }
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
const char *buf = pBuf->data(); input(pBuf->data(),pBuf->size());
int size = pBuf->size();
if (_onHandshake) {
//rtsp回复
int offset = 0;
while(offset < size - 4){
char *pos = (char *)memchr(buf + offset, 'R', size - offset);
if(pos == NULL){
break;
}
if(memcmp(pos, "RTSP", 4) == 0){
try {
pos += onProcess(pos);
} catch (std::exception &err) {
SockException ex(Err_other, err.what());
onPlayResult_l(ex);
onShutdown_l(ex);
teardown();
return;
}
}else{
pos += 1;
}
offset = pos - buf;
}
}
if (_eType == RTP_TCP && _pucRtpBuf) {
//RTP data
while (size > 0) {
int added = RTP_BUF_SIZE - _uiRtpBufLen;
added = (added > size ? size : added);
memcpy(_pucRtpBuf + _uiRtpBufLen, buf, added);
_uiRtpBufLen += added;
size -= added;
buf += added;
splitRtp(_pucRtpBuf, _uiRtpBufLen);
}
}
} }
void RtspPlayer::onErr(const SockException &ex) { void RtspPlayer::onErr(const SockException &ex) {
onShutdown_l (ex); onShutdown_l (ex);
@ -260,7 +215,6 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
throw std::runtime_error( throw std::runtime_error(
StrPrinter << "DESCRIBE:" << parser.Url() << " " << parser.Tail() << endl); StrPrinter << "DESCRIBE:" << parser.Url() << " " << parser.Tail() << endl);
} }
auto strSdp = parser.Content();
_strContentBase = parser["Content-Base"]; _strContentBase = parser["Content-Base"];
if(_strContentBase.empty()){ if(_strContentBase.empty()){
@ -270,19 +224,14 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
_strContentBase.pop_back(); _strContentBase.pop_back();
} }
auto iLen = atoi(parser["Content-Length"].data());
if(iLen > 0){
strSdp.erase(iLen);
}
//解析sdp //解析sdp
_sdpAttr.load(strSdp); _sdpAttr.load(parser.Content());
_aTrackInfo = _sdpAttr.getAvailableTrack(); _aTrackInfo = _sdpAttr.getAvailableTrack();
if (_aTrackInfo.empty()) { if (_aTrackInfo.empty()) {
throw std::runtime_error("无有效的Sdp Track"); throw std::runtime_error("无有效的Sdp Track");
} }
if (!onCheckSDP(strSdp, _sdpAttr)) { if (!onCheckSDP(parser.Content(), _sdpAttr)) {
throw std::runtime_error("onCheckSDP faied"); throw std::runtime_error("onCheckSDP faied");
} }
@ -346,6 +295,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
_eType = RTP_UDP; _eType = RTP_UDP;
} }
RtspSplitter::enableRecvRtp(_eType == RTP_TCP);
if(_eType == RTP_TCP) { if(_eType == RTP_TCP) {
string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-"); string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-");
_aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.c_str()); _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.c_str());
@ -416,6 +367,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
bool RtspPlayer::sendOptions() { bool RtspPlayer::sendOptions() {
_onHandshake = [](const Parser& parser){ _onHandshake = [](const Parser& parser){
// DebugL << "options response";
return true; return true;
}; };
return sendRtspRequest("OPTIONS",_strContentBase); return sendRtspRequest("OPTIONS",_strContentBase);
@ -494,88 +446,37 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
} }
} }
int RtspPlayer::onProcess(const char* pcBuf) { void RtspPlayer::onWholeRtspPacket(Parser &parser) {
auto strRtsp = FindField(pcBuf, "RTSP", "\r\n\r\n"); try {
if(strRtsp.empty()){ decltype(_onHandshake) fun;
return 4; _onHandshake.swap(fun);
}
strRtsp = string("RTSP") + strRtsp + "\r\n\r\n";
Parser parser;
parser.Parse(strRtsp.data());
int iLen = 0;
if (parser.Url() == "200") {
iLen = atoi(parser["Content-Length"].data());
if (iLen) {
string strContent(pcBuf + strRtsp.size(), iLen);
parser.setContent(strContent);
}
}
auto fun = _onHandshake;
_onHandshake = nullptr;
if(fun){ if(fun){
fun(parser); fun(parser);
} }
parser.Clear(); parser.Clear();
return strRtsp.size() + iLen; } catch (std::exception &err) {
SockException ex(Err_other, err.what());
onPlayResult_l(ex);
onShutdown_l(ex);
teardown();
}
} }
void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
void RtspPlayer::splitRtp(unsigned char* pucRtp, unsigned int uiLen) { if(len > 1600){
unsigned char* rtp_ptr = pucRtp;
while (uiLen >= 4) {
if (rtp_ptr[0] == '$') {
//通道0
uint8_t interleaved = rtp_ptr[1];
uint16_t length = (rtp_ptr[2] << 8) | rtp_ptr[3];
if (length > 1600) {
//没有大于MTU的包 //没有大于MTU的包
//WarnL << "没有大于MTU的包:" << length; return;
rtp_ptr += 1;
uiLen -= 1;
continue;
}
if ((unsigned int) length + 4 + 4 > uiLen) {
//buf 太小,还没到该RTP包的结尾
break;
}
auto nextPkt = rtp_ptr + length + 4;
if (*nextPkt != '$' && memcmp(nextPkt,"RTSP",4)!=0 ) {
//没有找到该包的尾部
//WarnL << "没有找到该包的尾部";
rtp_ptr += 1;
uiLen -= 1;
continue;
} }
int trackIdx = -1; int trackIdx = -1;
uint8_t interleaved = data[1];
if(interleaved %2 ==0){ if(interleaved %2 ==0){
trackIdx = getTrackIndexByInterleaved(interleaved); trackIdx = getTrackIndexByInterleaved(interleaved);
} }
if (trackIdx != -1) { if (trackIdx != -1) {
handleOneRtp(trackIdx, rtp_ptr + 4, length); handleOneRtp(trackIdx, (unsigned char *)data + 4, len - 4);
}
rtp_ptr += (length + 4);
uiLen -= (length + 4);
continue;
}
unsigned char *pos = (unsigned char *) memchr(rtp_ptr + 1, '$', uiLen - 1);
if (pos == NULL) {
//缓存里面没有任何RTP包
//WarnL << "缓存里面没有任何RTP包";
uiLen = 0;
break;
}
//有RTP包起始头
uiLen -= (pos - rtp_ptr);
rtp_ptr = pos;
}
_uiRtpBufLen = uiLen;
if (rtp_ptr != pucRtp) {
memmove(pucRtp, rtp_ptr, uiLen);
} }
} }
# define AV_RB16(x) \ # define AV_RB16(x) \
((((const uint8_t*)(x))[0] << 8) | \ ((((const uint8_t*)(x))[0] << 8) | \
((const uint8_t*)(x))[1]) ((const uint8_t*)(x))[1])

View File

@ -39,6 +39,7 @@
#include "Poller/Timer.h" #include "Poller/Timer.h"
#include "Network/Socket.h" #include "Network/Socket.h"
#include "Network/TcpClient.h" #include "Network/TcpClient.h"
#include "RtspSplitter.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -46,7 +47,7 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
//实现了rtsp播放器协议部分的功能 //实现了rtsp播放器协议部分的功能
class RtspPlayer: public PlayerBase,public TcpClient { class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter {
public: public:
typedef std::shared_ptr<RtspPlayer> Ptr; typedef std::shared_ptr<RtspPlayer> Ptr;
@ -62,6 +63,19 @@ protected:
virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0; virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0;
uint32_t getProgressMilliSecond() const; uint32_t getProgressMilliSecond() const;
void seekToMilliSecond(uint32_t ms); void seekToMilliSecond(uint32_t ms);
/**
* rtsp包回调sdp等content数据
* @param parser rtsp包
*/
void onWholeRtspPacket(Parser &parser) override ;
/**
* rtp包回调
* @param data
* @param len
*/
void onRtpPacket(const char *data,uint64_t len) override ;
private: private:
void onShutdown_l(const SockException &ex); void onShutdown_l(const SockException &ex);
void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, int iTrackidx); void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, int iTrackidx);
@ -81,11 +95,6 @@ private:
void handleResDESCRIBE(const Parser &parser); void handleResDESCRIBE(const Parser &parser);
bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr);
void handleResPAUSE(const Parser &parser, bool bPause); void handleResPAUSE(const Parser &parser, bool bPause);
//发数据给服务器
int onProcess(const char* strBuf);
//生成rtp包结构体
void splitRtp(unsigned char *pucData, unsigned int uiLen);
//处理一个rtp包 //处理一个rtp包
bool handleOneRtp(int iTrackidx, unsigned char *ucData, unsigned int uiLen); bool handleOneRtp(int iTrackidx, unsigned char *ucData, unsigned int uiLen);
@ -103,9 +112,6 @@ private:
function<void(const Parser&)> _onHandshake; function<void(const Parser&)> _onHandshake;
RtspMediaSource::PoolType _pktPool; RtspMediaSource::PoolType _pktPool;
uint8_t *_pucRtpBuf = nullptr;
unsigned int _uiRtpBufLen = 0;
Socket::Ptr _apUdpSock[2]; Socket::Ptr _apUdpSock[2];
//rtsp info //rtsp info
string _strSession; string _strSession;

82
src/Rtsp/RtspSplitter.cpp Normal file
View File

@ -0,0 +1,82 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "RtspSplitter.h"
namespace mediakit{
const char *RtspSplitter::onSearchPacketTail(const char *data, int len) {
if(!_enableRecvRtp){
_isRtpPacket = false;
return HttpRequestSplitter::onSearchPacketTail(data, len);
}
if(data[0] != '$'){
//这是rtsp包
_isRtpPacket = false;
return HttpRequestSplitter::onSearchPacketTail(data, len);
}
//这是rtp包
if(len < 4){
//数据不够
return nullptr;
}
uint16_t length = (((uint8_t *)data)[2] << 8) | ((uint8_t *)data)[3];
if(len < length + 4){
//数据不够
return nullptr;
}
//返回rtp包末尾
_isRtpPacket = true;
return data + 4 + length;
}
int64_t RtspSplitter::onRecvHeader(const char *data, uint64_t len) {
if(_isRtpPacket){
onRtpPacket(data,len);
return 0;
}
_parser.Parse(data);
auto ret = atoi(_parser["Content-Length"].data());
if(ret == 0){
onWholeRtspPacket(_parser);
}
return ret;
}
void RtspSplitter::onRecvContent(const char *data, uint64_t len) {
_parser.setContent(string(data,len));
onWholeRtspPacket(_parser);
}
void RtspSplitter::enableRecvRtp(bool enable) {
_enableRecvRtp = enable;
}
}//namespace mediakit

72
src/Rtsp/RtspSplitter.h Normal file
View File

@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef ZLMEDIAKIT_RTSPSPLITTER_H
#define ZLMEDIAKIT_RTSPSPLITTER_H
#include "Rtsp.h"
#include "Http/HttpRequestSplitter.h"
namespace mediakit{
class RtspSplitter : public HttpRequestSplitter{
public:
RtspSplitter(){}
virtual ~RtspSplitter(){}
protected:
/**
* rtsp包回调sdp等content数据
* @param parser rtsp包
*/
virtual void onWholeRtspPacket(Parser &parser) = 0;
/**
* rtp包回调
* @param data
* @param len
*/
virtual void onRtpPacket(const char *data,uint64_t len) = 0;
/**
* rtp包
* @param enable
*/
void enableRecvRtp(bool enable);
protected:
const char *onSearchPacketTail(const char *data,int len) override ;
int64_t onRecvHeader(const char *data,uint64_t len) override;
void onRecvContent(const char *data,uint64_t len) override;
private:
bool _enableRecvRtp = false;
bool _isRtpPacket = false;
Parser _parser;
};
}//namespace mediakit
#endif //ZLMEDIAKIT_RTSPSPLITTER_H