rtmp点播采用简单握手,发送心跳包,兼容fms服务器: #2007

This commit is contained in:
ziyue 2022-09-30 13:34:56 +08:00
parent bfebcd62c7
commit 34838b1b26
3 changed files with 46 additions and 30 deletions

View File

@ -44,11 +44,16 @@ void RtmpPlayer::teardown() {
_deque_on_status.clear(); _deque_on_status.clear();
} }
void RtmpPlayer::play(const string &strUrl) { void RtmpPlayer::play(const string &url) {
teardown(); teardown();
string host_url = FindField(strUrl.data(), "://", "/"); string host_url = FindField(url.data(), "://", "/");
_app = FindField(strUrl.data(), (host_url + "/").data(), "/"); {
_stream_id = FindField(strUrl.data(), (host_url + "/" + _app + "/").data(), NULL); auto pos = url.find_last_of('/');
if (pos != string::npos) {
_stream_id = url.substr(pos + 1);
}
}
_app = FindField(url.data(), (host_url + "/").data(), ("/" + _stream_id).data());
_tc_url = string("rtmp://") + host_url + "/" + _app; _tc_url = string("rtmp://") + host_url + "/" + _app;
if (!_app.size() || !_stream_id.size()) { if (!_app.size() || !_stream_id.size()) {
@ -109,16 +114,16 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) {
//播放成功恢复rtmp接收超时定时器 //播放成功恢复rtmp接收超时定时器
_rtmp_recv_ticker.resetTime(); _rtmp_recv_ticker.resetTime();
auto timeout_ms = (*this)[Client::kMediaTimeoutMS].as<uint64_t>(); auto timeout_ms = (*this)[Client::kMediaTimeoutMS].as<uint64_t>();
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this()); weak_ptr<RtmpPlayer> weak_self = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
auto lam = [weakSelf, timeout_ms]() { auto lam = [weak_self, timeout_ms]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return false; return false;
} }
if (strongSelf->_rtmp_recv_ticker.elapsedTime() > timeout_ms) { if (strong_self->_rtmp_recv_ticker.elapsedTime() > timeout_ms) {
//接收rtmp媒体数据超时 //接收rtmp媒体数据超时
SockException ex(Err_timeout, "receive rtmp timeout"); SockException ex(Err_timeout, "receive rtmp timeout");
strongSelf->onPlayResult_l(ex, true); strong_self->onPlayResult_l(ex, true);
return false; return false;
} }
return true; return true;
@ -135,14 +140,12 @@ void RtmpPlayer::onConnect(const SockException &err){
onPlayResult_l(err, false); onPlayResult_l(err, false);
return; return;
} }
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this()); weak_ptr<RtmpPlayer> weak_self = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
startClientSession([weakSelf]() { startClientSession([weak_self]() {
auto strongSelf = weakSelf.lock(); if (auto strong_self = weak_self.lock()) {
if (!strongSelf) { strong_self->send_connect();
return;
} }
strongSelf->send_connect(); },_app.find("vod") != 0); // 实测发现vod点播时使用复杂握手fms无响应issue #2007
});
} }
void RtmpPlayer::onRecv(const Buffer::Ptr &buf){ void RtmpPlayer::onRecv(const Buffer::Ptr &buf){
@ -249,14 +252,14 @@ inline void RtmpPlayer::send_pause(bool pause) {
_beat_timer.reset(); _beat_timer.reset();
if (pause) { if (pause) {
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this()); weak_ptr<RtmpPlayer> weak_self = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
_beat_timer.reset(new Timer((*this)[Client::kBeatIntervalMS].as<int>() / 1000.0f, [weakSelf]() { _beat_timer.reset(new Timer((*this)[Client::kBeatIntervalMS].as<int>() / 1000.0f, [weak_self]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return false; return false;
} }
uint32_t timeStamp = (uint32_t)::time(NULL); uint32_t timeStamp = (uint32_t)::time(NULL);
strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); strong_self->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
return true; return true;
}, getPoller())); }, getPoller()));
} }

View File

@ -288,12 +288,14 @@ const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len){
} }
////for client//// ////for client////
void RtmpProtocol::startClientSession(const function<void()> &func) { void RtmpProtocol::startClientSession(const function<void()> &func, bool complex) {
//发送 C0C1 //发送 C0C1
char handshake_head = HANDSHAKE_PLAINTEXT; char handshake_head = HANDSHAKE_PLAINTEXT;
onSendRawData(obtainBuffer(&handshake_head, 1)); onSendRawData(obtainBuffer(&handshake_head, 1));
RtmpHandshake c1(0); RtmpHandshake c1(0);
if (complex) {
c1.create_complex_c0c1(); c1.create_complex_c0c1();
}
onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1))); onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1)));
_next_step_func = [this, func](const char *data, size_t len) { _next_step_func = [this, func](const char *data, size_t len) {
//等待 S0+S1+S2 //等待 S0+S1+S2
@ -754,7 +756,8 @@ void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) {
case MSG_WIN_SIZE: { case MSG_WIN_SIZE: {
//如果窗口太小会导致发送sendAcknowledgement时无限递归https://github.com/ZLMediaKit/ZLMediaKit/issues/1839 //如果窗口太小会导致发送sendAcknowledgement时无限递归https://github.com/ZLMediaKit/ZLMediaKit/issues/1839
_windows_size = max(load_be32(&chunk_data.buffer[0]), 32 * 1024U); //窗口太大也可能导致fms服务器认为播放器心跳超时
_windows_size = min(max(load_be32(&chunk_data.buffer[0]), 32 * 1024U), 1280 * 1024U);
TraceL << "MSG_WIN_SIZE:" << _windows_size; TraceL << "MSG_WIN_SIZE:" << _windows_size;
break; break;
} }
@ -806,7 +809,15 @@ void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) {
break; break;
} }
default: onRtmpChunk(std::move(packet)); break; default: {
_bytes_recv += packet->size();
if (_windows_size > 0 && _bytes_recv - _bytes_recv_last >= _windows_size) {
_bytes_recv_last = _bytes_recv;
sendAcknowledgement(_bytes_recv);
}
onRtmpChunk(std::move(packet));
break;
}
} }
} }

View File

@ -32,7 +32,7 @@ public:
void onParseRtmp(const char *data, size_t size); void onParseRtmp(const char *data, size_t size);
//作为客户端发送c0c1等待s0s1s2并且回调 //作为客户端发送c0c1等待s0s1s2并且回调
void startClientSession(const std::function<void()> &cb); void startClientSession(const std::function<void()> &cb, bool complex = true);
protected: protected:
virtual void onSendRawData(toolkit::Buffer::Ptr buffer) = 0; virtual void onSendRawData(toolkit::Buffer::Ptr buffer) = 0;
@ -94,8 +94,10 @@ private:
size_t _chunk_size_in = DEFAULT_CHUNK_LEN; size_t _chunk_size_in = DEFAULT_CHUNK_LEN;
size_t _chunk_size_out = DEFAULT_CHUNK_LEN; size_t _chunk_size_out = DEFAULT_CHUNK_LEN;
////////////Acknowledgement//////////// ////////////Acknowledgement////////////
uint32_t _bytes_sent = 0; uint64_t _bytes_sent = 0;
uint32_t _bytes_sent_last = 0; uint64_t _bytes_sent_last = 0;
uint64_t _bytes_recv = 0;
uint64_t _bytes_recv_last = 0;
uint32_t _windows_size = 0; uint32_t _windows_size = 0;
///////////PeerBandwidth/////////// ///////////PeerBandwidth///////////
uint32_t _bandwidth = 2500000; uint32_t _bandwidth = 2500000;