diff --git a/src/.DS_Store b/src/.DS_Store index 8bd3e911..f04e1d94 100644 Binary files a/src/.DS_Store and b/src/.DS_Store differ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 82076fe8..dc63bebf 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -49,7 +49,7 @@ void RtmpPusher::teardown() { lock_guard lck(m_mtxOnStatusCB); m_dqOnStatusCB.clear(); } - m_pPlayTimer.reset(); + m_pPublishTimer.reset(); clear(); shutdown(); } @@ -63,7 +63,7 @@ void RtmpPusher::publish(const char* strUrl) { m_strTcUrl = string("rtmp://") + strHost + "/" + m_strApp; if (!m_strApp.size() || !m_strStream.size()) { - _onPlayResult(SockException(Err_other,"rtmp url非法")); + onPublishResult(SockException(Err_other,"rtmp url非法")); return; } DebugL << strHost << " " << m_strApp << " " << m_strStream; @@ -80,20 +80,20 @@ void RtmpPusher::publish(const char* strUrl) { } void RtmpPusher::onErr(const SockException &ex){ - _onShutdown(ex); + onShutdown(ex); } void RtmpPusher::onConnect(const SockException &err){ if(err.getErrCode()!=Err_success) { - _onPlayResult(err); + onPublishResult(err); return; } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - m_pPlayTimer.reset( new Timer(10, [weakSelf]() { + m_pPublishTimer.reset( new Timer(10, [weakSelf]() { auto strongSelf=weakSelf.lock(); if(!strongSelf) { return false; } - strongSelf->_onPlayResult(SockException(Err_timeout,"publish rtmp timeout")); + strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->teardown(); return false; })); @@ -111,8 +111,8 @@ void RtmpPusher::onRecv(const Socket::Buffer::Ptr &pBuf){ onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); - _onPlayResult(ex); - _onShutdown(ex); + onPublishResult(ex); + onShutdown(ex); teardown(); } } @@ -193,11 +193,11 @@ inline void RtmpPusher::send_metaData(){ m_pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); if(strongSelf){ - strongSelf->_onShutdown(SockException(Err_other,"媒体源被释放")); + strongSelf->onShutdown(SockException(Err_other,"媒体源被释放")); strongSelf->teardown(); } }); - _onPlayResult(SockException(Err_success,"success")); + onPublishResult(SockException(Err_success,"success")); } void RtmpPusher::onCmd_result(AMFDecoder &dec){ auto iReqId = dec.load(); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index cb5dcf66..12a14aea 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -18,12 +18,21 @@ namespace Rtmp { class RtmpPusher: public RtmpProtocol , public TcpClient{ public: typedef std::shared_ptr Ptr; + typedef std::function Event; RtmpPusher(const char *strApp,const char *strStream); virtual ~RtmpPusher(); void publish(const char* strUrl); void teardown(); + void setOnPublished(Event onPublished) { + m_onPublished = onPublished; + } + + void setOnShutdown(Event onShutdown) { + m_onShutdown = onShutdown; + } + protected: //for Tcpclient @@ -36,19 +45,18 @@ protected: void onSendRawData(const char *pcRawData, int iSize) override { send(pcRawData, iSize); } - - virtual void onShutdown(const SockException &ex){} - virtual void onPlayResult(const SockException &ex) {} private: - void _onShutdown(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); - m_pPlayTimer.reset(); - onShutdown(ex); + void onShutdown(const SockException &ex) { + m_pPublishTimer.reset(); + if(m_onShutdown){ + m_onShutdown(ex); + } } - void _onPlayResult(const SockException &ex) { - WarnL << ex.getErrCode() << " " << ex.what(); - m_pPlayTimer.reset(); - onPlayResult(ex); + void onPublishResult(const SockException &ex) { + m_pPublishTimer.reset(); + if(m_onPublished){ + m_onPublished(ex); + } } template @@ -84,11 +92,14 @@ private: static unordered_map g_mapCmd; //超时功能实现 - std::shared_ptr m_pPlayTimer; + std::shared_ptr m_pPublishTimer; //源 std::weak_ptr m_pMediaSrc; RtmpMediaSource::RingType::RingReader::Ptr m_pRtmpReader; + //事件监听 + Event m_onShutdown; + Event m_onPublished; }; } /* namespace Rtmp */ diff --git a/tests/.DS_Store b/tests/.DS_Store new file mode 100644 index 00000000..d7f2a65a Binary files /dev/null and b/tests/.DS_Store differ diff --git a/tests/test_rtmpPusher.cpp b/tests/test_rtmpPusher.cpp new file mode 100644 index 00000000..11021cde --- /dev/null +++ b/tests/test_rtmpPusher.cpp @@ -0,0 +1,74 @@ +//============================================================================ +// Name : main.cpp +// Author : 熊子良 +// Version : +//============================================================================ + + +#include +#include +#include +#include "Util/logger.h" +#include "Util/onceToken.h" +#include "Util/NoticeCenter.h" +#include "Poller/EventPoller.h" +#include "Device/PlayerProxy.h" +#include "Rtmp/RtmpPusher.h" +#include "Common/config.h" + +using namespace std; +using namespace ZL::Util; +using namespace ZL::Rtmp; +using namespace ZL::Thread; +using namespace ZL::Network; +using namespace ZL::DEV; + +void programExit(int arg) { + EventPoller::Instance().shutdown(); +} +int main(int argc,char *argv[]){ + setExePath(argv[0]); + signal(SIGINT, programExit); + Logger::Instance().add(std::make_shared("stdout", LTrace)); + + PlayerProxy::Ptr player(new PlayerProxy("app","stream")); + //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" + //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请研读MediaReader代码) + player->play("rtmp://live.hkstv.hk.lxdns.com/live/hks"); + + RtmpPusher::Ptr pusher; + //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发。 + NoticeCenter::Instance().addListener(nullptr,Config::Broadcast::kBroadcastRtmpSrcRegisted, + [&pusher](BroadcastRtmpSrcRegistedArgs){ + //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 + const_cast(pusher).reset(new RtmpPusher(app,stream)); + + pusher->setOnPublished([](const SockException &ex){ + if(ex){ + WarnL << "发布失败:" << ex.what(); + }else{ + InfoL << "发布成功,请用播放器打开:rtmp://jizan.iok.la/live/test"; + } + }); + + //推流地址,请改成你自己的服务器。 + //这个范例地址(也是基于mediakit)是可用的,但是带宽只有1mb,访问可能很卡顿。 + pusher->publish("rtmp://jizan.iok.la/live/test"); + //如果你想监听RtmpPusher的相关事件,请派生之并重载 onShutdown 与 onPlayResult方法 + }); + + EventPoller::Instance().runLoop(); + NoticeCenter::Instance().delListener(nullptr); + player.reset(); + pusher.reset(); + + EventPoller::Destory(); + Logger::Destory(); + return 0; +} + + + + + +