From 568d8ad53fbacfbb40ae1f8d2ffcd48e2d2e0816 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 6 Jun 2017 20:06:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Rtmp=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/.DS_Store | Bin 10244 -> 12292 bytes src/Rtmp/RtmpPusher.cpp | 20 +++++------ src/Rtmp/RtmpPusher.h | 35 +++++++++++------- tests/.DS_Store | Bin 0 -> 6148 bytes tests/test_rtmpPusher.cpp | 74 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 107 insertions(+), 22 deletions(-) create mode 100644 tests/.DS_Store create mode 100644 tests/test_rtmpPusher.cpp diff --git a/src/.DS_Store b/src/.DS_Store index 8bd3e911869f6affcae48eddc63de9e7be96d99e..f04e1d9450bda71d45db062c7fac2fa5ca0337f7 100644 GIT binary patch delta 326 zcmZn(Xh~3DU|?W$DortDV2}VZIe-{M3vdI8HU0~Ae zVa}A|=Pg4Z)WFU;b0Wq%%jlGIC+ht&gP3s+H4aii%qr{5S^?fAip_5U^!5o zjvzD09H8;sK*AMd#m0-@nJ4qh=rS@)HsMj`V1ih$F*!wM=4N5FmrOuISQr>M831K9 BBkceH diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 82076fe8..dc63bebf 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -49,7 +49,7 @@ void RtmpPusher::teardown() { lock_guard lck(m_mtxOnStatusCB); m_dqOnStatusCB.clear(); } - m_pPlayTimer.reset(); + m_pPublishTimer.reset(); clear(); shutdown(); } @@ -63,7 +63,7 @@ void RtmpPusher::publish(const char* strUrl) { m_strTcUrl = string("rtmp://") + strHost + "/" + m_strApp; if (!m_strApp.size() || !m_strStream.size()) { - _onPlayResult(SockException(Err_other,"rtmp url非法")); + onPublishResult(SockException(Err_other,"rtmp url非法")); return; } DebugL << strHost << " " << m_strApp << " " << m_strStream; @@ -80,20 +80,20 @@ void RtmpPusher::publish(const char* strUrl) { } void RtmpPusher::onErr(const SockException &ex){ - _onShutdown(ex); + onShutdown(ex); } void RtmpPusher::onConnect(const SockException &err){ if(err.getErrCode()!=Err_success) { - _onPlayResult(err); + onPublishResult(err); return; } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - m_pPlayTimer.reset( new Timer(10, [weakSelf]() { + m_pPublishTimer.reset( new Timer(10, [weakSelf]() { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } - strongSelf->_onPlayResult(SockException(Err_timeout,"publish rtmp timeout")); + strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->teardown(); return false; })); @@ -111,8 +111,8 @@ void RtmpPusher::onRecv(const Socket::Buffer::Ptr &pBuf){ onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); - _onPlayResult(ex); - _onShutdown(ex); + onPublishResult(ex); + onShutdown(ex); teardown(); } } @@ -193,11 +193,11 @@ inline void RtmpPusher::send_metaData(){ m_pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); if(strongSelf){ - strongSelf->_onShutdown(SockException(Err_other,"媒体源被释放")); + strongSelf->onShutdown(SockException(Err_other,"媒体源被释放")); strongSelf->teardown(); } }); - _onPlayResult(SockException(Err_success,"success")); + onPublishResult(SockException(Err_success,"success")); } void RtmpPusher::onCmd_result(AMFDecoder &dec){ auto iReqId = dec.load(); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index cb5dcf66..12a14aea 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -18,12 +18,21 @@ namespace Rtmp { class RtmpPusher: public RtmpProtocol , public TcpClient{ public: typedef std::shared_ptr Ptr; + typedef std::function Event; RtmpPusher(const char *strApp,const char *strStream); virtual ~RtmpPusher(); void publish(const char* strUrl); void teardown(); + void setOnPublished(Event onPublished) { + m_onPublished = onPublished; + } + + void setOnShutdown(Event onShutdown) { + m_onShutdown = onShutdown; + } + protected: //for Tcpclient @@ -36,19 +45,18 @@ protected: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - - virtual void onShutdown(const SockException &ex){} - virtual void onPlayResult(const SockException &ex) {} private: - void _onShutdown(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); - m_pPlayTimer.reset(); - onShutdown(ex); + void onShutdown(const SockException &ex) { + m_pPublishTimer.reset(); + if(m_onShutdown){ + m_onShutdown(ex); + } } - void _onPlayResult(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); - m_pPlayTimer.reset(); - onPlayResult(ex); + void onPublishResult(const SockException &ex) { + m_pPublishTimer.reset(); + if(m_onPublished){ + m_onPublished(ex); + } } template @@ -84,11 +92,14 @@ private: static unordered_map g_mapCmd; //超时功能实现 - std::shared_ptr m_pPlayTimer; + std::shared_ptr m_pPublishTimer; //源 std::weak_ptr m_pMediaSrc; RtmpMediaSource::RingType::RingReader::Ptr m_pRtmpReader; + //事件监听 + Event m_onShutdown; + Event m_onPublished; }; } /* namespace Rtmp */ diff --git a/tests/.DS_Store b/tests/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d7f2a65a5f94692fd046a7e8879f247bc5e3f6a9 GIT binary patch literal 6148 zcmeHKyKVw85Znz{oRo%=(!anTSm+`lrOXdRa1=;L6ey{$%6IW;%sz@BLkA6-mDXdg zcWilz+glOQ)z7#TnTtpdHr~nn90#twsJXC=!ul@T&Pvv1$fC{{V0``3 +#include +#include +#include "Util/logger.h" +#include "Util/onceToken.h" +#include "Util/NoticeCenter.h" +#include "Poller/EventPoller.h" +#include "Device/PlayerProxy.h" +#include "Rtmp/RtmpPusher.h" +#include "Common/config.h" + +using namespace std; +using namespace ZL::Util; +using namespace ZL::Rtmp; +using namespace ZL::Thread; +using namespace ZL::Network; +using namespace ZL::DEV; + +void programExit(int arg) { + EventPoller::Instance().shutdown(); +} +int main(int argc,char *argv[]){ + setExePath(argv[0]); + signal(SIGINT, programExit); + Logger::Instance().add(std::make_shared("stdout", LTrace)); + + PlayerProxy::Ptr player(new PlayerProxy("app","stream")); + //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" + //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请研读MediaReader代码) + player->play("rtmp://live.hkstv.hk.lxdns.com/live/hks"); + + RtmpPusher::Ptr pusher; + //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发。 + NoticeCenter::Instance().addListener(nullptr,Config::Broadcast::kBroadcastRtmpSrcRegisted, + [&pusher](BroadcastRtmpSrcRegistedArgs){ + //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 + const_cast(pusher).reset(new RtmpPusher(app,stream)); + + pusher->setOnPublished([](const SockException &ex){ + if(ex){ + WarnL << "发布失败:" << ex.what(); + }else{ + InfoL << "发布成功,请用播放器打开:rtmp://jizan.iok.la/live/test"; + } + }); + + //推流地址,请改成你自己的服务器。 + //这个范例地址(也是基于mediakit)是可用的,但是带宽只有1mb,访问可能很卡顿。 + pusher->publish("rtmp://jizan.iok.la/live/test"); + //如果你想监听RtmpPusher的相关事件,请派生之并重载 onShutdown 与 onPlayResult方法 + }); + + EventPoller::Instance().runLoop(); + NoticeCenter::Instance().delListener(nullptr); + player.reset(); + pusher.reset(); + + EventPoller::Destory(); + Logger::Destory(); + return 0; +} + + + + + +