From dad0c5e3ccb42884bb0284a48ce5ce0e5e63c8eb Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Tue, 9 Nov 2021 16:46:38 +0800 Subject: [PATCH] =?UTF-8?q?MediaPusher:=20=E6=8A=BD=E8=B1=A1=E7=B2=BE?= =?UTF-8?q?=E7=AE=80=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Pusher/PusherBase.cpp | 10 +++++----- src/Pusher/PusherBase.h | 39 +++++++++++++++++++++++++++------------ src/Rtmp/RtmpPusher.cpp | 24 ++++++++++-------------- src/Rtmp/RtmpPusher.h | 16 +++------------- src/Rtsp/RtspPusher.cpp | 24 ++++++++++-------------- src/Rtsp/RtspPusher.h | 15 +++------------ 6 files changed, 58 insertions(+), 70 deletions(-) diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp index b7e91d89..2916ec4d 100644 --- a/src/Pusher/PusherBase.cpp +++ b/src/Pusher/PusherBase.cpp @@ -30,22 +30,22 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller, string prefix = FindField(strUrl.data(), NULL, "://"); if (strcasecmp("rtsps",prefix.data()) == 0) { - return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); } if (strcasecmp("rtsp",prefix.data()) == 0) { - return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); + return PusherBase::Ptr(new RtspPusherImp(poller,dynamic_pointer_cast(src)),releasePusher); } if (strcasecmp("rtmps",prefix.data()) == 0) { - return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); } if (strcasecmp("rtmp",prefix.data()) == 0) { - return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast(src)),releasePusher); + return PusherBase::Ptr(new RtmpPusherImp(poller,dynamic_pointer_cast(src)),releasePusher); } - return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); + return PusherBase::Ptr(new RtspPusherImp(poller,dynamic_pointer_cast(src)),releasePusher); } PusherBase::PusherBase() { diff --git a/src/Pusher/PusherBase.h b/src/Pusher/PusherBase.h index 3538118f..6bf70aab 100644 --- a/src/Pusher/PusherBase.h +++ b/src/Pusher/PusherBase.h @@ -38,12 +38,12 @@ public: * 开始推流 * @param strUrl 视频url,支持rtsp/rtmp */ - virtual void publish(const string &strUrl) = 0; + virtual void publish(const string &strUrl) {}; /** * 中断推流 */ - virtual void teardown() = 0; + virtual void teardown() {}; /** * 摄像推流结果回调 @@ -54,6 +54,10 @@ public: * 设置断开回调 */ virtual void setOnShutdown(const Event &cb) = 0; + +protected: + virtual void onShutdown(const SockException &ex) = 0; + virtual void onPublishResult(const SockException &ex) = 0; }; template @@ -67,21 +71,21 @@ public: /** * 开始推流 - * @param strUrl 推流url,支持rtsp/rtmp + * @param url 推流url,支持rtsp/rtmp */ - void publish(const string &strUrl) override { - if (_delegate) { - _delegate->publish(strUrl); - } + void publish(const string &url) override { + return _delegate ? _delegate->publish(url) : Parent::publish(url); } /** * 中断推流 */ void teardown() override { - if (_delegate) { - _delegate->teardown(); - } + return _delegate ? _delegate->teardown() : Parent::teardown(); + } + + std::shared_ptr getSockInfo() const { + return dynamic_pointer_cast(_delegate); } /** @@ -104,8 +108,19 @@ public: _on_shutdown = cb; } - std::shared_ptr getSockInfo() const { - return dynamic_pointer_cast(_delegate); +protected: + void onShutdown(const SockException &ex) override { + if (_on_shutdown) { + _on_shutdown(ex); + _on_shutdown = nullptr; + } + } + + void onPublishResult(const SockException &ex) override { + if (_on_publish) { + _on_publish(ex); + _on_publish = nullptr; + } } protected: diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 02facae1..7a03c830 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -46,7 +46,7 @@ void RtmpPusher::teardown() { } } -void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { +void RtmpPusher::onPublishResult_l(const SockException &ex, bool handshake_done) { DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 @@ -55,14 +55,10 @@ void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { if (!handshake_done) { //播放结果回调 _publish_timer.reset(); - if (_on_published) { - _on_published(ex); - } + onPublishResult(ex); } else { //播放成功后异常断开回调 - if (_on_shutdown) { - _on_shutdown(ex); - } + onShutdown(ex); } if (ex) { @@ -78,7 +74,7 @@ void RtmpPusher::publish(const string &url) { _tc_url = string("rtmp://") + host_url + "/" + _app; if (!_app.size() || !_stream_id.size()) { - onPublishResult(SockException(Err_other, "rtmp url非法"), false); + onPublishResult_l(SockException(Err_other, "rtmp url非法"), false); return; } DebugL << host_url << " " << _app << " " << _stream_id; @@ -99,7 +95,7 @@ void RtmpPusher::publish(const string &url) { if (!strongSelf) { return false; } - strongSelf->onPublishResult(SockException(Err_timeout, "publish rtmp timeout"), false); + strongSelf->onPublishResult_l(SockException(Err_timeout, "publish rtmp timeout"), false); return false; }, getPoller())); @@ -112,12 +108,12 @@ void RtmpPusher::publish(const string &url) { void RtmpPusher::onErr(const SockException &ex){ //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex, !_publish_timer); + onPublishResult_l(ex, !_publish_timer); } void RtmpPusher::onConnect(const SockException &err){ if (err) { - onPublishResult(err, false); + onPublishResult_l(err, false); return; } weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); @@ -138,7 +134,7 @@ void RtmpPusher::onRecv(const Buffer::Ptr &buf){ } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex, !_publish_timer); + onPublishResult_l(ex, !_publish_timer); } } @@ -226,10 +222,10 @@ inline void RtmpPusher::send_metaData(){ _rtmp_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (strong_self) { - strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); + strong_self->onPublishResult_l(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); } }); - onPublishResult(SockException(Err_success, "success"), false); + onPublishResult_l(SockException(Err_success, "success"), false); //提升发送性能 setSocketFlags(); } diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 0e5428be..3f119ec1 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -27,14 +27,6 @@ public: void publish(const string &url) override ; void teardown() override; - void setOnPublished(const Event &cb) override { - _on_published = cb; - } - - void setOnShutdown(const Event &cb) override{ - _on_shutdown = cb; - } - protected: //for Tcpclient override void onRecv(const Buffer::Ptr &buf) override; @@ -48,7 +40,7 @@ protected: } private: - void onPublishResult(const SockException &ex, bool handshake_done); + void onPublishResult_l(const SockException &ex, bool handshake_done); template inline void addOnResultCB(const FUN &fun) { @@ -81,16 +73,14 @@ private: deque > _deque_on_status; unordered_map > _map_on_result; - //事件监听 - Event _on_shutdown; - Event _on_published; - //推流超时定时器 std::shared_ptr _publish_timer; std::weak_ptr _publish_src; RtmpMediaSource::RingType::RingReader::Ptr _rtmp_reader; }; +using RtmpPusherImp = PusherImp; + } /* namespace mediakit */ #endif /* SRC_RTMP_RTMPPUSHER_H_ */ diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index a2d9604a..f754466a 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -57,7 +57,7 @@ void RtspPusher::teardown() { void RtspPusher::publish(const string &url_str) { RtspUrl url; if (!url.parse(url_str)) { - onPublishResult(SockException(Err_other, StrPrinter << "illegal rtsp url:" << url_str), false); + onPublishResult_l(SockException(Err_other, StrPrinter << "illegal rtsp url:" << url_str), false); return; } @@ -83,7 +83,7 @@ void RtspPusher::publish(const string &url_str) { if (!strong_self) { return false; } - strong_self->onPublishResult(SockException(Err_timeout, "publish rtsp timeout"), false); + strong_self->onPublishResult_l(SockException(Err_timeout, "publish rtsp timeout"), false); return false; }, getPoller())); @@ -94,7 +94,7 @@ void RtspPusher::publish(const string &url_str) { startConnect(url._host, url._port, publish_timeout_sec); } -void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { +void RtspPusher::onPublishResult_l(const SockException &ex, bool handshake_done) { DebugL << ex.what(); if (ex.getErrCode() == Err_shutdown) { //主动shutdown的,不触发回调 @@ -103,14 +103,10 @@ void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { if (!handshake_done) { //播放结果回调 _publish_timer.reset(); - if (_on_published) { - _on_published(ex); - } + onPublishResult(ex); } else { //播放成功后异常断开回调 - if (_on_shutdown) { - _on_shutdown(ex); - } + onShutdown(ex); } if (ex) { @@ -120,12 +116,12 @@ void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { void RtspPusher::onErr(const SockException &ex) { //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex, !_publish_timer); + onPublishResult_l(ex, !_publish_timer); } void RtspPusher::onConnect(const SockException &err) { if (err) { - onPublishResult(err, false); + onPublishResult_l(err, false); return; } sendAnnounce(); @@ -137,7 +133,7 @@ void RtspPusher::onRecv(const Buffer::Ptr &buf){ } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPublishTimer为空后表明握手结束了 - onPublishResult(ex, !_publish_timer); + onPublishResult_l(ex, !_publish_timer); } } @@ -465,7 +461,7 @@ void RtspPusher::sendRecord() { _rtsp_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (strong_self) { - strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); + strong_self->onPublishResult_l(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); } }); if (_rtp_type != Rtsp::RTP_TCP) { @@ -479,7 +475,7 @@ void RtspPusher::sendRecord() { return true; }, getPoller())); } - onPublishResult(SockException(Err_success, "success"), false); + onPublishResult_l(SockException(Err_success, "success"), false); //提升发送性能 setSocketFlags(); }; diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index 5958d0b1..56e8a2b4 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -36,14 +36,6 @@ public: void publish(const string &url) override; void teardown() override; - void setOnPublished(const Event &cb) override { - _on_published = cb; - } - - void setOnShutdown(const Event & cb) override{ - _on_shutdown = cb; - } - protected: //for Tcpclient override void onRecv(const Buffer::Ptr &buf) override; @@ -57,7 +49,7 @@ protected: virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data, size_t len); private: - void onPublishResult(const SockException &ex, bool handshake_done); + void onPublishResult_l(const SockException &ex, bool handshake_done); void sendAnnounce(); void sendSetup(unsigned int track_idx); @@ -102,9 +94,6 @@ private: std::shared_ptr _beat_timer; std::weak_ptr _push_src; RtspMediaSource::RingType::RingReader::Ptr _rtsp_reader; - //事件监听 - Event _on_shutdown; - Event _on_published; function _on_res_func; ////////// rtcp //////////////// //rtcp发送时间,trackid idx 为数组下标 @@ -113,5 +102,7 @@ private: vector _rtcp_context; }; +using RtspPusherImp = PusherImp; + } /* namespace mediakit */ #endif //ZLMEDIAKIT_RTSPPUSHER_H