diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index d00c8e7a..247a6d19 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -91,14 +91,14 @@ void RtmpPlayer::onErr(const SockException &ex){ onPlayResult_l(ex, !_play_timer); } -void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshakeCompleted) { +void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) { if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return; } WarnL << ex.getErrCode() << " " << ex.what(); - if (!handshakeCompleted) { + if (!handshake_done) { //开始播放阶段 _play_timer.reset(); //是否为性能测试模式 @@ -152,14 +152,14 @@ void RtmpPlayer::onConnect(const SockException &err){ }); } -void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){ +void RtmpPlayer::onRecv(const Buffer::Ptr &buf){ try { if (_benchmark_mode && !_play_timer) { //在性能测试模式下,如果rtmp握手完毕后,不再解析rtmp包 _rtmp_recv_ticker.resetTime(); return; } - onParseRtmp(pBuf->data(), pBuf->size()); + onParseRtmp(buf->data(), buf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPlayTimer为空后表明握手结束了 @@ -226,21 +226,21 @@ inline void RtmpPlayer::send_play() { addOnStatusCB(fun); } -inline void RtmpPlayer::send_pause(bool bPause) { +inline void RtmpPlayer::send_pause(bool pause) { AMFEncoder enc; - enc << "pause" << ++_send_req_id << nullptr << bPause; + enc << "pause" << ++_send_req_id << nullptr << pause; sendRequest(MSG_CMD, enc.data()); - auto fun = [this, bPause](AMFValue &val) { + auto fun = [this, pause](AMFValue &val) { //TraceL << "pause onStatus"; auto level = val["level"].as_string(); auto code = val["code"].as_string(); if (level != "status") { - if (!bPause) { + if (!pause) { throw std::runtime_error(StrPrinter << "pause 恢复播放失败:" << level << " " << code << endl); } } else { - _paused = bPause; - if (!bPause) { + _paused = pause; + if (!pause) { onPlayResult_l(SockException(Err_success, "resum rtmp success"), true); } else { //暂停播放 @@ -251,7 +251,7 @@ inline void RtmpPlayer::send_pause(bool bPause) { addOnStatusCB(fun); _beat_timer.reset(); - if (bPause) { + if (pause) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _beat_timer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf]() { auto strongSelf = weakSelf.lock(); @@ -314,32 +314,32 @@ void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) { _metadata_got = true; } -void RtmpPlayer::onStreamDry(uint32_t stream_id) { - //TraceL << stream_id; +void RtmpPlayer::onStreamDry(uint32_t stream_index) { + //TraceL << stream_index; onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true); } -void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) { +void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) { _rtmp_recv_ticker.resetTime(); if (!_play_timer) { //已经触发了onPlayResult事件,直接触发onMediaData事件 - onMediaData(packet); + onMediaData(chunk_data); return; } - if (packet->isCfgFrame()) { + if (chunk_data->isCfgFrame()) { //输入配置帧以便初始化完成各个track - onMediaData(packet); + onMediaData(chunk_data); } else { //先触发onPlayResult事件,这个时候解码器才能初始化完毕 onPlayResult_l(SockException(Err_success, "play rtmp success"), false); //触发onPlayResult事件后,再把帧数据输入到解码器 - onMediaData(packet); + onMediaData(chunk_data); } } -void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { +void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) { typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec); static unordered_map s_func_map; static onceToken token([]() { @@ -349,12 +349,12 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { s_func_map.emplace("onMetaData", &RtmpPlayer::onCmd_onMetaData); }); - switch (chunkData.type_id) { + switch (chunk_data.type_id) { case MSG_CMD: case MSG_CMD3: case MSG_DATA: case MSG_DATA3: { - AMFDecoder dec(chunkData.buffer, 0); + AMFDecoder dec(chunk_data.buffer, 0); std::string type = dec.load(); auto it = s_func_map.find(type); if (it != s_func_map.end()) { @@ -368,10 +368,10 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { case MSG_AUDIO: case MSG_VIDEO: { - auto idx = chunkData.type_id % 2; + auto idx = chunk_data.type_id % 2; if (_now_stamp_ticker[idx].elapsedTime() > 500) { //计算播放进度时间轴用 - _now_stamp[idx] = chunkData.time_stamp; + _now_stamp[idx] = chunk_data.time_stamp; } if (!_metadata_got) { if (!onCheckMeta(TitleMeta().getMetadata())) { @@ -379,7 +379,7 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { } _metadata_got = true; } - onMediaData_l(std::make_shared(std::move(chunkData))); + onMediaData_l(std::make_shared(std::move(chunk_data))); break; } diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 51a42228..08c0bde0 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -41,34 +41,34 @@ public: void teardown() override; protected: - virtual bool onCheckMeta(const AMFValue &val) =0; - virtual void onMediaData(const RtmpPacket::Ptr &chunkData) =0; + virtual bool onCheckMeta(const AMFValue &val) = 0; + virtual void onMediaData(const RtmpPacket::Ptr &chunk_data) = 0; uint32_t getProgressMilliSecond() const; void seekToMilliSecond(uint32_t ms); protected: - void onMediaData_l(const RtmpPacket::Ptr &chunkData); + void onMediaData_l(const RtmpPacket::Ptr &chunk_data); //在获取config帧后才触发onPlayResult_l(而不是收到play命令回复),所以此时所有track都初始化完毕了 - void onPlayResult_l(const SockException &ex, bool handshakeCompleted); + void onPlayResult_l(const SockException &ex, bool handshake_done); //form Tcpclient - void onRecv(const Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &buf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; //from RtmpProtocol - void onRtmpChunk(RtmpPacket &chunkData) override; - void onStreamDry(uint32_t ui32StreamId) override; - void onSendRawData(const Buffer::Ptr &buffer) override{ + void onRtmpChunk(RtmpPacket &chunk_data) override; + void onStreamDry(uint32_t stream_index) override; + void onSendRawData(const Buffer::Ptr &buffer) override { send(buffer); } template - inline void addOnResultCB(const FUNC &func) { + void addOnResultCB(const FUNC &func) { lock_guard lck(_mtx_on_result); _map_on_result.emplace(_send_req_id, func); } template - inline void addOnStatusCB(const FUNC &func) { + void addOnStatusCB(const FUNC &func) { lock_guard lck(_mtx_on_status); _deque_on_status.emplace_back(func); } @@ -77,10 +77,10 @@ protected: void onCmd_onStatus(AMFDecoder &dec); void onCmd_onMetaData(AMFDecoder &dec); - inline void send_connect(); - inline void send_createStream(); - inline void send_play(); - inline void send_pause(bool bPause); + void send_connect(); + void send_createStream(); + void send_play(); + void send_pause(bool pause); private: string _app; diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index d1ed4d5a..8c658bc1 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -15,11 +15,6 @@ #include "Thread/ThreadPool.h" using namespace toolkit; -#ifdef ENABLE_OPENSSL -#include "Util/SSLBox.h" -#include -#include - #define C1_DIGEST_SIZE 32 #define C1_KEY_SIZE 128 #define C1_SCHEMA_SIZE 764 @@ -29,6 +24,11 @@ using namespace toolkit; #define S2_FMS_KEY_SIZE 68 #define C1_OFFSET_SIZE 4 +#ifdef ENABLE_OPENSSL +#include "Util/SSLBox.h" +#include +#include + static string openssl_HMACsha256(const void *key, unsigned int key_len, const void *data,unsigned int data_len){ std::shared_ptr out(new char[32], [](char *ptr) { delete[] ptr; }); unsigned int out_len; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index e8dddac7..42f0dc82 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -18,141 +18,147 @@ using namespace mediakit::Client; namespace mediakit { -RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){ - _pMediaSrc=src; +RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr &src) : TcpClient(poller){ + _publish_src = src; } RtmpPusher::~RtmpPusher() { teardown(); DebugL << endl; } + void RtmpPusher::teardown() { if (alive()) { - _strApp.clear(); - _strStream.clear(); - _strTcUrl.clear(); + _app.clear(); + _stream_id.clear(); + _tc_url.clear(); { - lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.clear(); + lock_guard lck(_mtx_on_result); + _map_on_result.clear(); } { - lock_guard lck(_mtxOnStatusCB); - _dqOnStatusCB.clear(); + lock_guard lck(_mtx_on_status); + _deque_on_status.clear(); } - _pPublishTimer.reset(); + _publish_timer.reset(); reset(); - shutdown(SockException(Err_shutdown,"teardown")); + shutdown(SockException(Err_shutdown, "teardown")); } } -void RtmpPusher::onPublishResult(const SockException &ex,bool handshakeCompleted) { - if(!handshakeCompleted){ +void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { + if (ex.getErrCode() == Err_shutdown) { + //主动shutdown的,不触发回调 + return; + } + if (!handshake_done) { //播放结果回调 - _pPublishTimer.reset(); - if(_onPublished){ - _onPublished(ex); + _publish_timer.reset(); + if (_on_published) { + _on_published(ex); } } else { //播放成功后异常断开回调 - if(_onShutdown){ - _onShutdown(ex); + if (_on_shutdown) { + _on_shutdown(ex); } } - if(ex){ + if (ex) { teardown(); } } -void RtmpPusher::publish(const string &strUrl) { +void RtmpPusher::publish(const string &url) { teardown(); - string strHost = FindField(strUrl.data(), "://", "/"); - _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/"); - _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL); - _strTcUrl = string("rtmp://") + strHost + "/" + _strApp; + string host_url = FindField(url.data(), "://", "/"); + _app = FindField(url.data(), (host_url + "/").data(), "/"); + _stream_id = FindField(url.data(), (host_url + "/" + _app + "/").data(), NULL); + _tc_url = string("rtmp://") + host_url + "/" + _app; - if (!_strApp.size() || !_strStream.size()) { - onPublishResult(SockException(Err_other,"rtmp url非法"),false); + if (!_app.size() || !_stream_id.size()) { + onPublishResult(SockException(Err_other, "rtmp url非法"), false); return; } - DebugL << strHost << " " << _strApp << " " << _strStream; + DebugL << host_url << " " << _app << " " << _stream_id; - auto iPort = atoi(FindField(strHost.data(), ":", NULL).data()); + auto iPort = atoi(FindField(host_url.data(), ":", NULL).data()); if (iPort <= 0) { //rtmp 默认端口1935 iPort = 1935; } else { //服务器域名 - strHost = FindField(strHost.data(), NULL, ":"); + host_url = FindField(host_url.data(), NULL, ":"); } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); float publishTimeOutSec = (*this)[kTimeoutMS].as() / 1000.0; - _pPublishTimer.reset( new Timer(publishTimeOutSec, [weakSelf]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + _publish_timer.reset(new Timer(publishTimeOutSec, [weakSelf]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { return false; } - strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"), false); + strongSelf->onPublishResult(SockException(Err_timeout, "publish rtmp timeout"), false); return false; - },getPoller())); + }, getPoller())); - if(!(*this)[kNetAdapter].empty()){ + if (!(*this)[kNetAdapter].empty()) { setNetAdapter((*this)[kNetAdapter]); } - startConnect(strHost, iPort); + startConnect(host_url, iPort); } void RtmpPusher::onErr(const SockException &ex){ //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex,!_pPublishTimer); + onPublishResult(ex, !_publish_timer); } + void RtmpPusher::onConnect(const SockException &err){ - if(err) { - onPublishResult(err,false); + if (err) { + onPublishResult(err, false); return; } //推流器不需要多大的接收缓存,节省内存占用 _sock->setReadBuffer(std::make_shared(1 * 1024)); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - startClientSession([weakSelf](){ - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + startClientSession([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->sendChunkSize(60000); - strongSelf->send_connect(); + strong_self->sendChunkSize(60000); + strong_self->send_connect(); }); } -void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){ + +void RtmpPusher::onRecv(const Buffer::Ptr &buf){ try { - onParseRtmp(pBuf->data(), pBuf->size()); + onParseRtmp(buf->data(), buf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex,!_pPublishTimer); + onPublishResult(ex, !_publish_timer); } } - inline void RtmpPusher::send_connect() { AMFValue obj(AMF_OBJECT); - obj.set("app", _strApp); + obj.set("app", _app); obj.set("type", "nonprivate"); - obj.set("tcUrl", _strTcUrl); - obj.set("swfUrl", _strTcUrl); + obj.set("tcUrl", _tc_url); + obj.set("swfUrl", _tc_url); sendInvoke("connect", obj); - addOnResultCB([this](AMFDecoder &dec){ + addOnResultCB([this](AMFDecoder &dec) { //TraceL << "connect result"; dec.load(); auto val = dec.load(); auto level = val["level"].as_string(); auto code = val["code"].as_string(); - if(level != "status"){ - throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl); + if (level != "status") { + throw std::runtime_error(StrPrinter << "connect 失败:" << level << " " << code << endl); } send_createStream(); }); @@ -161,23 +167,24 @@ inline void RtmpPusher::send_connect() { inline void RtmpPusher::send_createStream() { AMFValue obj(AMF_NULL); sendInvoke("createStream", obj); - addOnResultCB([this](AMFDecoder &dec){ + addOnResultCB([this](AMFDecoder &dec) { //TraceL << "createStream result"; dec.load(); _stream_index = dec.load(); send_publish(); }); } + inline void RtmpPusher::send_publish() { AMFEncoder enc; - enc << "publish" << ++_send_req_id << nullptr << _strStream << _strApp ; + enc << "publish" << ++_send_req_id << nullptr << _stream_id << _app; sendRequest(MSG_CMD, enc.data()); addOnStatusCB([this](AMFValue &val) { auto level = val["level"].as_string(); auto code = val["code"].as_string(); - if(level != "status") { - throw std::runtime_error(StrPrinter <<"publish 失败:" << level << " " << code << endl); + if (level != "status") { + throw std::runtime_error(StrPrinter << "publish 失败:" << level << " " << code << endl); } //start send media send_metaData(); @@ -185,51 +192,51 @@ inline void RtmpPusher::send_publish() { } inline void RtmpPusher::send_metaData(){ - auto src = _pMediaSrc.lock(); + auto src = _publish_src.lock(); if (!src) { throw std::runtime_error("the media source was released"); } AMFEncoder enc; - enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); + enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); sendRequest(MSG_DATA, enc.data()); - - src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id ); + + src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { + sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id); }); - - _pRtmpReader = src->getRing()->attach(getPoller()); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + + _rtmp_reader = src->getRing()->attach(getPoller()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _rtmp_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } int i = 0; int size = pkt->size(); - strongSelf->setSendFlushFlag(false); - pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ - if(++i == size){ - strongSelf->setSendFlushFlag(true); + strong_self->setSendFlushFlag(false); + pkt->for_each([&](const RtmpPacket::Ptr &rtmp) { + if (++i == size) { + strong_self->setSendFlushFlag(true); } - strongSelf->sendRtmp(rtmp->type_id, strongSelf->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); + strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); }); }); - _pRtmpReader->setDetachCB([weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer); + _rtmp_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); } }); - onPublishResult(SockException(Err_success,"success"), false); + onPublishResult(SockException(Err_success, "success"), false); //提升发送性能 setSocketFlags(); } void RtmpPusher::setSocketFlags(){ GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); - if(mergeWriteMS > 0) { + if (mergeWriteMS > 0) { //提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); SockUtil::setNoDelay(_sock->rawFD(), false); @@ -237,70 +244,72 @@ void RtmpPusher::setSocketFlags(){ } void RtmpPusher::onCmd_result(AMFDecoder &dec){ - auto iReqId = dec.load(); - lock_guard lck(_mtxOnResultCB); - auto it = _mapOnResultCB.find(iReqId); - if(it != _mapOnResultCB.end()){ + auto req_id = dec.load(); + lock_guard lck(_mtx_on_result); + auto it = _map_on_result.find(req_id); + if (it != _map_on_result.end()) { it->second(dec); - _mapOnResultCB.erase(it); - }else{ + _map_on_result.erase(it); + } else { WarnL << "unhandled _result"; } } + void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) { AMFValue val; - while(true){ + while (true) { val = dec.load(); - if(val.type() == AMF_OBJECT){ + if (val.type() == AMF_OBJECT) { break; } } - if(val.type() != AMF_OBJECT){ + if (val.type() != AMF_OBJECT) { throw std::runtime_error("onStatus:the result object was not found"); } - lock_guard lck(_mtxOnStatusCB); - if(_dqOnStatusCB.size()){ - _dqOnStatusCB.front()(val); - _dqOnStatusCB.pop_front(); - }else{ + lock_guard lck(_mtx_on_status); + if (_deque_on_status.size()) { + _deque_on_status.front()(val); + _deque_on_status.pop_front(); + } else { auto level = val["level"]; auto code = val["code"].as_string(); - if(level.type() == AMF_STRING){ - if(level.as_string() != "status"){ - throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl); + if (level.type() == AMF_STRING) { + if (level.as_string() != "status") { + throw std::runtime_error(StrPrinter << "onStatus 失败:" << level.as_string() << " " << code << endl); } } } } -void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) { - switch (chunkData.type_id) { +void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) { + switch (chunk_data.type_id) { case MSG_CMD: case MSG_CMD3: { typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec); static unordered_map g_mapCmd; static onceToken token([]() { - g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result); - g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result); - g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus); + g_mapCmd.emplace("_error", &RtmpPusher::onCmd_result); + g_mapCmd.emplace("_result", &RtmpPusher::onCmd_result); + g_mapCmd.emplace("onStatus", &RtmpPusher::onCmd_onStatus); }); - AMFDecoder dec(chunkData.buffer, 0); + AMFDecoder dec(chunk_data.buffer, 0); std::string type = dec.load(); auto it = g_mapCmd.find(type); - if(it != g_mapCmd.end()){ + if (it != g_mapCmd.end()) { auto fun = it->second; (this->*fun)(dec); - }else{ + } else { WarnL << "can not support cmd:" << type; } - } break; + } + default: - //WarnL << "unhandled message:" << (int) chunkData.type_id << hexdump(chunkData.buffer.data(), chunkData.buffer.size()); + //WarnL << "unhandled message:" << (int) chunk_data.type_id << hexdump(chunk_data.buffer.data(), chunk_data.buffer.size()); break; - } + } } diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 9614c9f7..3d7e7044 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -18,46 +18,47 @@ namespace mediakit { -class RtmpPusher: public RtmpProtocol , public TcpClient , public PusherBase{ +class RtmpPusher : public RtmpProtocol, public TcpClient, public PusherBase { public: typedef std::shared_ptr Ptr; RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src); - virtual ~RtmpPusher(); - - void publish(const string &strUrl) override ; + ~RtmpPusher() override; + void publish(const string &url) override ; void teardown() override; void setOnPublished(const Event &cb) override { - _onPublished = cb; + _on_published = cb; } void setOnShutdown(const Event &cb) override{ - _onShutdown = cb; + _on_shutdown = cb; } + protected: //for Tcpclient override - void onRecv(const Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &buf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; //for RtmpProtocol override - void onRtmpChunk(RtmpPacket &chunkData) override; + void onRtmpChunk(RtmpPacket &chunk_data) override; void onSendRawData(const Buffer::Ptr &buffer) override{ send(buffer); } + private: - void onPublishResult(const SockException &ex,bool handshakeCompleted); + void onPublishResult(const SockException &ex, bool handshake_done); template inline void addOnResultCB(const FUN &fun) { - lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.emplace(_send_req_id, fun); + lock_guard lck(_mtx_on_result); + _map_on_result.emplace(_send_req_id, fun); } template inline void addOnStatusCB(const FUN &fun) { - lock_guard lck(_mtxOnStatusCB); - _dqOnStatusCB.emplace_back(fun); + lock_guard lck(_mtx_on_status); + _deque_on_status.emplace_back(fun); } void onCmd_result(AMFDecoder &dec); @@ -69,23 +70,25 @@ private: inline void send_publish(); inline void send_metaData(); void setSocketFlags(); -private: - string _strApp; - string _strStream; - string _strTcUrl; - unordered_map > _mapOnResultCB; - recursive_mutex _mtxOnResultCB; - deque > _dqOnStatusCB; - recursive_mutex _mtxOnStatusCB; - //超时功能实现 - std::shared_ptr _pPublishTimer; - //源 - std::weak_ptr _pMediaSrc; - RtmpMediaSource::RingType::RingReader::Ptr _pRtmpReader; +private: + string _app; + string _stream_id; + string _tc_url; + + recursive_mutex _mtx_on_result; + recursive_mutex _mtx_on_status; + deque > _deque_on_status; + unordered_map > _map_on_result; + //事件监听 - Event _onShutdown; - Event _onPublished; + Event _on_shutdown; + Event _on_published; + + //推流超时定时器 + std::shared_ptr _publish_timer; + std::weak_ptr _publish_src; + RtmpMediaSource::RingType::RingReader::Ptr _rtmp_reader; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 6c960cb9..6fb012f2 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -732,14 +732,14 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &tra } } -void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { +void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshake_done) { if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 return; } WarnL << ex.getErrCode() << " " << ex.what(); - if (!handshakeCompleted) { + if (!handshake_done) { //开始播放阶段 _play_check_timer.reset(); onPlayResult(ex); diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index e1c485fa..e6529a23 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -87,7 +87,7 @@ protected: private: void onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track); - void onPlayResult_l(const SockException &ex , bool handshakeCompleted); + void onPlayResult_l(const SockException &ex , bool handshake_done); int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByTrackType(TrackType track_type) const; diff --git a/src/Rtsp/RtspPlayerImp.h b/src/Rtsp/RtspPlayerImp.h index 2c9ea972..814a4299 100644 --- a/src/Rtsp/RtspPlayerImp.h +++ b/src/Rtsp/RtspPlayerImp.h @@ -25,46 +25,51 @@ using namespace toolkit; namespace mediakit { -class RtspPlayerImp: public PlayerImp { +class RtspPlayerImp : public PlayerImp { public: typedef std::shared_ptr Ptr; - RtspPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller){} - virtual ~RtspPlayerImp(){ - DebugL< 0){ + + RtspPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} + ~RtspPlayerImp() override{ + DebugL << endl; + } + + float getProgress() const override { + if (getDuration() > 0) { return getProgressMilliSecond() / (getDuration() * 1000); } return PlayerBase::getProgress(); - - }; - void seekTo(float fProgress) override{ - fProgress = MAX(float(0),MIN(fProgress,float(1.0))); + + } + + void seekTo(float fProgress) override { + fProgress = MAX(float(0), MIN(fProgress, float(1.0))); seekToMilliSecond(fProgress * getDuration() * 1000); - }; + } + private: //派生类回调函数 bool onCheckSDP(const string &sdp) override { - _pRtspMediaSrc = dynamic_pointer_cast(_pMediaSrc); - if(_pRtspMediaSrc){ - _pRtspMediaSrc->setSdp(sdp); + _rtsp_media_src = dynamic_pointer_cast(_pMediaSrc); + if (_rtsp_media_src) { + _rtsp_media_src->setSdp(sdp); } _delegate.reset(new RtspDemuxer); _delegate->loadSdp(sdp); return true; } + void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) override { - if(_pRtspMediaSrc){ + if (_rtsp_media_src) { // rtsp直接代理是无法判断该rtp是否是I帧,所以GOP缓存基本是无效的 // 为了减少内存使用,那么我们设置为一直关键帧以便清空GOP缓存 - _pRtspMediaSrc->onWrite(rtp,true); + _rtsp_media_src->onWrite(rtp, true); } _delegate->inputRtp(rtp); - if(_maxAnalysisMS && _delegate->isInited(_maxAnalysisMS)){ - PlayerImp::onPlayResult(SockException(Err_success,"play rtsp success")); - _maxAnalysisMS = 0; + if (_max_analysis_ms && _delegate->isInited(_max_analysis_ms)) { + PlayerImp::onPlayResult(SockException(Err_success, "play rtsp success")); + _max_analysis_ms = 0; } } @@ -74,17 +79,18 @@ private: //如果超过这个时间还未获取成功,那么会强制触发onPlayResult事件(虽然此时有些track还未初始化成功) void onPlayResult(const SockException &ex) override { //isInited判断条件:无超时 - if(ex || _delegate->isInited(0)){ + if (ex || _delegate->isInited(0)) { //已经初始化成功,说明sdp里面有完善的信息 - PlayerImp::onPlayResult(ex); - }else{ + PlayerImp::onPlayResult(ex); + } else { //还没初始化成功,说明sdp里面信息不完善,还有一些track未初始化成功 - _maxAnalysisMS = (*this)[Client::kMaxAnalysisMS]; + _max_analysis_ms = (*this)[Client::kMaxAnalysisMS]; } } + private: - RtspMediaSource::Ptr _pRtspMediaSrc; - int _maxAnalysisMS = 0; + int _max_analysis_ms = 0; + RtspMediaSource::Ptr _rtsp_media_src; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 16dd12b7..fef34056 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -17,8 +17,8 @@ using namespace mediakit::Client; namespace mediakit { -RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){ - _pMediaSrc = src; +RtspPusher::RtspPusher(const EventPoller::Ptr &poller, const RtspMediaSource::Ptr &src) : TcpClient(poller) { + _push_src = src; } RtspPusher::~RtspPusher() { @@ -28,30 +28,30 @@ RtspPusher::~RtspPusher() { void RtspPusher::teardown() { if (alive()) { - sendRtspRequest("TEARDOWN" ,_strContentBase); - shutdown(SockException(Err_shutdown,"teardown")); + sendRtspRequest("TEARDOWN", _content_base); + shutdown(SockException(Err_shutdown, "teardown")); } reset(); - CLEAR_ARR(_apUdpSock); - _rtspMd5Nonce.clear(); - _rtspRealm.clear(); - _aTrackInfo.clear(); - _strSession.clear(); - _strContentBase.clear(); - _strSession.clear(); - _uiCseq = 1; - _pPublishTimer.reset(); - _pBeatTimer.reset(); - _pRtspReader.reset(); - _aTrackInfo.clear(); - _onHandshake = nullptr; + CLEAR_ARR(_udp_socks); + _nonce.clear(); + _realm.clear(); + _track_vec.clear(); + _session_id.clear(); + _content_base.clear(); + _session_id.clear(); + _cseq = 1; + _publish_timer.reset(); + _beat_timer.reset(); + _rtsp_reader.reset(); + _track_vec.clear(); + _on_res_func = nullptr; } -void RtspPusher::publish(const string &strUrl) { +void RtspPusher::publish(const string &url_str) { RtspUrl url; - if(!url.parse(strUrl)){ - onPublishResult(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false); + if (!url.parse(url_str)) { + onPublishResult(SockException(Err_other, StrPrinter << "illegal rtsp url:" << url_str), false); return; } @@ -65,55 +65,60 @@ void RtspPusher::publish(const string &strUrl) { (*this)[kRtspPwdIsMD5] = false; } - _strUrl = strUrl; - _eType = (Rtsp::eRtpType)(int)(*this)[kRtpType]; - DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " << (url._passwd.size() ? url._passwd : "null") << " " << _eType; + _url = url_str; + _rtp_type = (Rtsp::eRtpType) (int) (*this)[kRtpType]; + DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " + << (url._passwd.size() ? url._passwd : "null") << " " << _rtp_type; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - float publishTimeOutSec = (*this)[kTimeoutMS].as() / 1000.0; - _pPublishTimer.reset( new Timer(publishTimeOutSec, [weakSelf]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + float publish_timeout_sec = (*this)[kTimeoutMS].as() / 1000.0; + _publish_timer.reset(new Timer(publish_timeout_sec, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout"),false); + strong_self->onPublishResult(SockException(Err_timeout, "publish rtsp timeout"), false); return false; - },getPoller())); + }, getPoller())); - if(!(*this)[kNetAdapter].empty()){ + if (!(*this)[kNetAdapter].empty()) { setNetAdapter((*this)[kNetAdapter]); } - startConnect(url._host, url._port, publishTimeOutSec); + startConnect(url._host, url._port, publish_timeout_sec); } -void RtspPusher::onPublishResult(const SockException &ex, bool handshakeCompleted) { - if(!handshakeCompleted){ +void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { + if (ex.getErrCode() == Err_shutdown) { + //主动shutdown的,不触发回调 + return; + } + if (!handshake_done) { //播放结果回调 - _pPublishTimer.reset(); - if(_onPublished){ - _onPublished(ex); + _publish_timer.reset(); + if (_on_published) { + _on_published(ex); } } else { //播放成功后异常断开回调 - if(_onShutdown){ - _onShutdown(ex); + if (_on_shutdown) { + _on_shutdown(ex); } } - if(ex){ + if (ex) { teardown(); } } void RtspPusher::onErr(const SockException &ex) { //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex,!_pPublishTimer); + onPublishResult(ex, !_publish_timer); } void RtspPusher::onConnect(const SockException &err) { - if(err) { - onPublishResult(err,false); + if (err) { + onPublishResult(err, false); return; } //推流器不需要多大的接收缓存,节省内存占用 @@ -121,41 +126,40 @@ void RtspPusher::onConnect(const SockException &err) { sendAnnounce(); } -void RtspPusher::onRecv(const Buffer::Ptr &pBuf){ +void RtspPusher::onRecv(const Buffer::Ptr &buf){ try { - input(pBuf->data(), pBuf->size()); + input(buf->data(), buf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex,!_pPublishTimer); + onPublishResult(ex, !_publish_timer); } } void RtspPusher::onWholeRtspPacket(Parser &parser) { - decltype(_onHandshake) fun; - _onHandshake.swap(fun); - if(fun){ - fun(parser); + decltype(_on_res_func) func; + _on_res_func.swap(func); + if (func) { + func(parser); } parser.Clear(); } - void RtspPusher::sendAnnounce() { - auto src = _pMediaSrc.lock(); + auto src = _push_src.lock(); if (!src) { throw std::runtime_error("the media source was released"); } //解析sdp - _sdpParser.load(src->getSdp()); - _aTrackInfo = _sdpParser.getAvailableTrack(); + _sdp_parser.load(src->getSdp()); + _track_vec = _sdp_parser.getAvailableTrack(); - if (_aTrackInfo.empty()) { + if (_track_vec.empty()) { throw std::runtime_error("无有效的Sdp Track"); } - _onHandshake = std::bind(&RtspPusher::handleResAnnounce,this, placeholders::_1); - sendRtspRequest("ANNOUNCE",_strUrl,{},src->getSdp()); + _on_res_func = std::bind(&RtspPusher::handleResAnnounce, this, placeholders::_1); + sendRtspRequest("ANNOUNCE", _url, {}, src->getSdp()); } void RtspPusher::handleResAnnounce(const Parser &parser) { @@ -165,9 +169,9 @@ void RtspPusher::handleResAnnounce(const Parser &parser) { sendAnnounce(); return; } - if(parser.Url() == "302"){ + if (parser.Url() == "302") { auto newUrl = parser["Location"]; - if(newUrl.empty()){ + if (newUrl.empty()) { throw std::runtime_error("未找到Location字段(跳转url)"); } publish(newUrl); @@ -176,45 +180,45 @@ void RtspPusher::handleResAnnounce(const Parser &parser) { if (parser.Url() != "200") { throw std::runtime_error(StrPrinter << "ANNOUNCE:" << parser.Url() << " " << parser.Tail()); } - _strContentBase = parser["Content-Base"]; + _content_base = parser["Content-Base"]; - if(_strContentBase.empty()){ - _strContentBase = _strUrl; + if (_content_base.empty()) { + _content_base = _url; } - if (_strContentBase.back() == '/') { - _strContentBase.pop_back(); + if (_content_base.back() == '/') { + _content_base.pop_back(); } sendSetup(0); } -bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { - if(!_rtspRealm.empty()){ +bool RtspPusher::handleAuthenticationFailure(const string ¶ms_str) { + if (!_realm.empty()) { //已经认证过了 return false; } - char *realm = new char[paramsStr.size()]; - char *nonce = new char[paramsStr.size()]; - char *stale = new char[paramsStr.size()]; - onceToken token(nullptr,[&](){ + char *realm = new char[params_str.size()]; + char *nonce = new char[params_str.size()]; + char *stale = new char[params_str.size()]; + onceToken token(nullptr, [&]() { delete[] realm; delete[] nonce; delete[] stale; }); - if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) { - _rtspRealm = (const char *)realm; - _rtspMd5Nonce = (const char *)nonce; + if (sscanf(params_str.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) { + _realm = (const char *) realm; + _nonce = (const char *) nonce; return true; } - if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) { - _rtspRealm = (const char *)realm; - _rtspMd5Nonce = (const char *)nonce; + if (sscanf(params_str.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) { + _realm = (const char *) realm; + _nonce = (const char *) nonce; return true; } - if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) { - _rtspRealm = (const char *)realm; + if (sscanf(params_str.data(), "Basic realm=\"%[^\"]\"", realm) == 1) { + _realm = (const char *) realm; return true; } return false; @@ -222,30 +226,33 @@ bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { //有必要的情况下创建udp端口 void RtspPusher::createUdpSockIfNecessary(int track_idx){ - auto &rtpSockRef = _apUdpSock[track_idx]; - if(!rtpSockRef){ - rtpSockRef.reset(new Socket(getPoller())); + auto &rtp_sock = _udp_socks[track_idx]; + if (!rtp_sock) { + rtp_sock.reset(new Socket(getPoller())); //rtp随机端口 - if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) { - rtpSockRef.reset(); + if (!rtp_sock->bindUdpSock(0, get_local_ip().data())) { + rtp_sock.reset(); throw std::runtime_error("open rtp sock failed"); } } } -void RtspPusher::sendSetup(unsigned int trackIndex) { - _onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex); - auto &track = _aTrackInfo[trackIndex]; - auto baseUrl = _strContentBase + "/" + track->_control_surffix; - switch (_eType) { +void RtspPusher::sendSetup(unsigned int track_idx) { + _on_res_func = std::bind(&RtspPusher::handleResSetup, this, placeholders::_1, track_idx); + auto &track = _track_vec[track_idx]; + auto base_url = _content_base + "/" + track->_control_surffix; + switch (_rtp_type) { case Rtsp::RTP_TCP: { - sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); + sendRtspRequest("SETUP", base_url, {"Transport", + StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 + << "-" << track->_type * 2 + 1}); } break; case Rtsp::RTP_UDP: { - createUdpSockIfNecessary(trackIndex); - int port = _apUdpSock[trackIndex]->get_local_port(); - sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); + createUdpSockIfNecessary(track_idx); + int port = _udp_socks[track_idx]->get_local_port(); + sendRtspRequest("SETUP", base_url, + {"Transport", StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } break; default: @@ -254,42 +261,41 @@ void RtspPusher::sendSetup(unsigned int trackIndex) { } -void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) { +void RtspPusher::handleResSetup(const Parser &parser, unsigned int track_idx) { if (parser.Url() != "200") { - throw std::runtime_error( - StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); + throw std::runtime_error(StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); } - if (uiTrackIndex == 0) { - _strSession = parser["Session"]; - _strSession.append(";"); - _strSession = FindField(_strSession.data(), nullptr, ";"); + if (track_idx == 0) { + _session_id = parser["Session"]; + _session_id.append(";"); + _session_id = FindField(_session_id.data(), nullptr, ";"); } - auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ - _eType = Rtsp::RTP_TCP; - string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); - _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data()); - }else if(strTransport.find("multicast") != string::npos){ + auto transport = parser["Transport"]; + if (transport.find("TCP") != string::npos || transport.find("interleaved") != string::npos) { + _rtp_type = Rtsp::RTP_TCP; + string interleaved = FindField(FindField((transport + ";").data(), "interleaved=", ";").data(), NULL, "-"); + _track_vec[track_idx]->_interleaved = atoi(interleaved.data()); + } else if (transport.find("multicast") != string::npos) { throw std::runtime_error("SETUP rtsp pusher can not support multicast!"); - }else{ - _eType = Rtsp::RTP_UDP; - createUdpSockIfNecessary(uiTrackIndex); - const char *strPos = "server_port=" ; - auto port_str = FindField((strTransport + ";").data(), strPos, ";"); + } else { + _rtp_type = Rtsp::RTP_UDP; + createUdpSockIfNecessary(track_idx); + const char *strPos = "server_port="; + auto port_str = FindField((transport + ";").data(), strPos, ";"); uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); struct sockaddr_in rtpto; rtpto.sin_port = ntohs(port); rtpto.sin_family = AF_INET; rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); - _apUdpSock[uiTrackIndex]->setSendPeerAddr((struct sockaddr *)&(rtpto)); + _udp_socks[track_idx]->setSendPeerAddr((struct sockaddr *) &(rtpto)); } - RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP); + RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP); - if (uiTrackIndex < _aTrackInfo.size() - 1) { + if (track_idx < _track_vec.size() - 1) { //需要继续发送SETUP命令 - sendSetup(uiTrackIndex + 1); + sendSetup(track_idx + 1); return; } @@ -297,13 +303,12 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) } void RtspPusher::sendOptions() { - _onHandshake = [this](const Parser& parser){}; - sendRtspRequest("OPTIONS",_strContentBase); + _on_res_func = [this](const Parser &parser) {}; + sendRtspRequest("OPTIONS", _content_base); } inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { - //InfoL<<(int)pkt.Interleaved; - switch (_eType) { + switch (_rtp_type) { case Rtsp::RTP_TCP: { int i = 0; int size = pkt->size(); @@ -315,85 +320,84 @@ inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) BufferRtp::Ptr buffer(new BufferRtp(rtp)); send(buffer); }); - } break; + } + case Rtsp::RTP_UDP: { int i = 0; int size = pkt->size(); pkt->for_each([&](const RtpPacket::Ptr &rtp) { int iTrackIndex = getTrackIndexByTrackType(rtp->type); - auto &pSock = _apUdpSock[iTrackIndex]; + auto &pSock = _udp_socks[iTrackIndex]; if (!pSock) { - shutdown(SockException(Err_shutdown,"udp sock not opened yet")); + shutdown(SockException(Err_shutdown, "udp sock not opened yet")); return; } - BufferRtp::Ptr buffer(new BufferRtp(rtp,4)); + BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); pSock->send(buffer, nullptr, 0, ++i == size); }); + break; } - break; - default: - break; + default : break; } } inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (type == _aTrackInfo[i]->_type) { + for (unsigned int i = 0; i < _track_vec.size(); i++) { + if (type == _track_vec[i]->_type) { return i; } } - if(_aTrackInfo.size() == 1){ + if (_track_vec.size() == 1) { return 0; } throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type); } void RtspPusher::sendRecord() { - _onHandshake = [this](const Parser& parser){ - auto src = _pMediaSrc.lock(); + _on_res_func = [this](const Parser &parser) { + auto src = _push_src.lock(); if (!src) { throw std::runtime_error("the media source was released"); } - _pRtspReader = src->getRing()->attach(getPoller()); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtspReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pkt){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + _rtsp_reader = src->getRing()->attach(getPoller()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _rtsp_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->sendRtpPacket(pkt); + strong_self->sendRtpPacket(pkt); }); - _pRtspReader->setDetachCB([weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer); + _rtsp_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); } }); - if(_eType != Rtsp::RTP_TCP){ + if (_rtp_type != Rtsp::RTP_TCP) { /////////////////////////心跳///////////////////////////////// - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf){ + _beat_timer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - strongSelf->sendOptions(); + strong_self->sendOptions(); return true; - },getPoller())); + }, getPoller())); } - onPublishResult(SockException(Err_success,"success"), false); + onPublishResult(SockException(Err_success, "success"), false); //提升发送性能 setSocketFlags(); }; - sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); + sendRtspRequest("RECORD", _content_base, {"Range", "npt=0.000-"}); } void RtspPusher::setSocketFlags(){ - GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); - if(mergeWriteMS > 0) { + GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); + if (merge_write_ms > 0) { //提高发送性能 setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); SockUtil::setNoDelay(_sock->rawFD(), false); @@ -404,26 +408,26 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std string key; StrCaseMap header_map; int i = 0; - for(auto &val : header){ - if(++i % 2 == 0){ - header_map.emplace(key,val); - }else{ + for (auto &val : header) { + if (++i % 2 == 0) { + header_map.emplace(key, val); + } else { key = val; } } - sendRtspRequest(cmd,url,header_map,sdp); + sendRtspRequest(cmd, url, header_map, sdp); } void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const,const string &sdp ) { auto header = header_const; - header.emplace("CSeq",StrPrinter << _uiCseq++); - header.emplace("User-Agent",SERVER_NAME); + header.emplace("CSeq", StrPrinter << _cseq++); + header.emplace("User-Agent", SERVER_NAME); - if(!_strSession.empty()){ - header.emplace("Session",_strSession); + if (!_session_id.empty()) { + header.emplace("Session", _session_id); } - if(!_rtspRealm.empty() && !(*this)[kRtspUser].empty()){ - if(!_rtspMd5Nonce.empty()){ + if (!_realm.empty() && !(*this)[kRtspUser].empty()) { + if (!_nonce.empty()) { //MD5认证 /* response计算方法如下: @@ -434,41 +438,41 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC response= md5( md5(username:realm:password):nonce:md5(public_method:url) ); */ string encrypted_pwd = (*this)[kRtspPwd]; - if(!(*this)[kRtspPwdIsMD5].as()){ - encrypted_pwd = MD5((*this)[kRtspUser]+ ":" + _rtspRealm + ":" + encrypted_pwd).hexdigest(); + if (!(*this)[kRtspPwdIsMD5].as()) { + encrypted_pwd = MD5((*this)[kRtspUser] + ":" + _realm + ":" + encrypted_pwd).hexdigest(); } - auto response = MD5( encrypted_pwd + ":" + _rtspMd5Nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); + auto response = MD5(encrypted_pwd + ":" + _nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); _StrPrinter printer; printer << "Digest "; printer << "username=\"" << (*this)[kRtspUser] << "\", "; - printer << "realm=\"" << _rtspRealm << "\", "; - printer << "nonce=\"" << _rtspMd5Nonce << "\", "; + printer << "realm=\"" << _realm << "\", "; + printer << "nonce=\"" << _nonce << "\", "; printer << "uri=\"" << url << "\", "; printer << "response=\"" << response << "\""; - header.emplace("Authorization",printer); - }else if(!(*this)[kRtspPwdIsMD5].as()){ + header.emplace("Authorization", printer); + } else if (!(*this)[kRtspPwdIsMD5].as()) { //base64认证 string authStr = StrPrinter << (*this)[kRtspUser] << ":" << (*this)[kRtspPwd]; char authStrBase64[1024] = {0}; - av_base64_encode(authStrBase64,sizeof(authStrBase64),(uint8_t *)authStr.data(),authStr.size()); - header.emplace("Authorization",StrPrinter << "Basic " << authStrBase64 ); + av_base64_encode(authStrBase64, sizeof(authStrBase64), (uint8_t *) authStr.data(), authStr.size()); + header.emplace("Authorization", StrPrinter << "Basic " << authStrBase64); } } - if(!sdp.empty()){ - header.emplace("Content-Length",StrPrinter << sdp.size()); - header.emplace("Content-Type","application/sdp"); + if (!sdp.empty()) { + header.emplace("Content-Length", StrPrinter << sdp.size()); + header.emplace("Content-Type", "application/sdp"); } _StrPrinter printer; printer << cmd << " " << url << " RTSP/1.0\r\n"; - for (auto &pr : header){ + for (auto &pr : header) { printer << pr.first << ": " << pr.second << "\r\n"; } printer << "\r\n"; - if(!sdp.empty()){ + if (!sdp.empty()) { printer << sdp; } SockSender::send(printer); diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index ccc4c76f..5f521763 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -31,39 +31,39 @@ class RtspPusher : public TcpClient, public RtspSplitter, public PusherBase { public: typedef std::shared_ptr Ptr; RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src); - virtual ~RtspPusher(); - - void publish(const string &strUrl) override; - + ~RtspPusher() override; + void publish(const string &url) override; void teardown() override; void setOnPublished(const Event &cb) override { - _onPublished = cb; + _on_published = cb; } void setOnShutdown(const Event & cb) override{ - _onShutdown = cb; + _on_shutdown = cb; } + protected: //for Tcpclient override - void onRecv(const Buffer::Ptr &pBuf) override; + void onRecv(const Buffer::Ptr &buf) override; void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; //RtspSplitter override void onWholeRtspPacket(Parser &parser) override ; void onRtpPacket(const char *data,uint64_t len) override {}; + private: - void onPublishResult(const SockException &ex, bool handshakeCompleted); + void onPublishResult(const SockException &ex, bool handshake_done); void sendAnnounce(); - void sendSetup(unsigned int uiTrackIndex); + void sendSetup(unsigned int track_idx); void sendRecord(); void sendOptions(); void handleResAnnounce(const Parser &parser); - void handleResSetup(const Parser &parser, unsigned int uiTrackIndex); - bool handleAuthenticationFailure(const string ¶msStr); + void handleResSetup(const Parser &parser, unsigned int track_idx); + bool handleAuthenticationFailure(const string ¶ms_str); inline int getTrackIndexByTrackType(TrackType type); @@ -73,33 +73,30 @@ private: void createUdpSockIfNecessary(int track_idx); void setSocketFlags(); + private: + unsigned int _cseq = 1; + Rtsp::eRtpType _rtp_type = Rtsp::RTP_TCP; + //rtsp鉴权相关 - string _rtspMd5Nonce; - string _rtspRealm; - + string _nonce; + string _realm; + string _url; + string _session_id; + string _content_base; + SdpParser _sdp_parser; + vector _track_vec; + Socket::Ptr _udp_socks[2]; //超时功能实现 - std::shared_ptr _pPublishTimer; - //源 - std::weak_ptr _pMediaSrc; - RtspMediaSource::RingType::RingReader::Ptr _pRtspReader; - //事件监听 - Event _onShutdown; - Event _onPublished; - - string _strUrl; - SdpParser _sdpParser; - vector _aTrackInfo; - string _strSession; - unsigned int _uiCseq = 1; - string _strContentBase; - Rtsp::eRtpType _eType = Rtsp::RTP_TCP; - Socket::Ptr _apUdpSock[2]; - function _onHandshake; + std::shared_ptr _publish_timer; //心跳定时器 - std::shared_ptr _pBeatTimer; - - + std::shared_ptr _beat_timer; + std::weak_ptr _push_src; + RtspMediaSource::RingType::RingReader::Ptr _rtsp_reader; + //事件监听 + Event _on_shutdown; + Event _on_published; + function _on_res_func; }; } /* namespace mediakit */