diff --git a/tests/test_pusher.cpp b/tests/test_pusher.cpp index 626b95a1..49929e56 100644 --- a/tests/test_pusher.cpp +++ b/tests/test_pusher.cpp @@ -40,28 +40,29 @@ using namespace mediakit; //推流器,保持强引用 MediaPusher::Ptr pusher; +Timer::Ptr g_timer; //声明函数 -void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url); +void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url); //创建推流器并开始推流 -void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { +void createPusher(const EventPoller::Ptr &poller, const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { //创建推流器并绑定一个MediaSource - pusher.reset(new MediaPusher(schema,vhost, app, stream)); + pusher.reset(new MediaPusher(schema,vhost, app, stream,poller)); //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp // (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 - pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) { + pusher->setOnShutdown([poller,schema,vhost, app, stream, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重试 - rePushDelay(schema,vhost,app, stream, url); + rePushDelay(poller,schema,vhost,app, stream, url); }); //设置发布结果处理逻辑 - pusher->setOnPublished([schema,vhost, app, stream, url](const SockException &ex) { + pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(schema,vhost,app, stream, url); + rePushDelay(poller,schema,vhost,app, stream, url); } else { InfoL << "Publish success,Please play with player:" << url; } @@ -69,16 +70,15 @@ void createPusher(const string &schema,const string &vhost,const string &app, co pusher->publish(url); } -Timer::Ptr g_timer; //推流失败或断开延迟2秒后重试推流 -void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { - g_timer = std::make_shared(2,[schema,vhost,app, stream, url]() { +void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { + g_timer = std::make_shared(2,[poller,schema,vhost,app, stream, url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(schema,vhost,app, stream, url); + createPusher(poller,schema,vhost,app, stream, url); //此任务不重复 return false; - }, nullptr); + }, poller); } //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 @@ -86,18 +86,19 @@ int domain(const string &playUrl, const string &pushUrl) { //设置日志 Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); + auto poller = EventPollerPool::Instance().getPoller(); //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream")); + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller)); player->play(playUrl.data()); //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, - [pushUrl](BroadcastMediaChangedArgs) { + [pushUrl,poller](BroadcastMediaChangedArgs) { //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 if(bRegist && pushUrl.find(schema) == 0){ - createPusher(schema,vhost,app, stream, pushUrl); + createPusher(poller,schema,vhost,app, stream, pushUrl); } }); diff --git a/tests/test_pusherMp4.cpp b/tests/test_pusherMp4.cpp index fc4e8b0c..fc502d29 100644 --- a/tests/test_pusherMp4.cpp +++ b/tests/test_pusherMp4.cpp @@ -41,10 +41,13 @@ using namespace mediakit; //推流器,保持强引用 MediaPusher::Ptr pusher; +Timer::Ptr g_timer; + //声明函数 //推流失败或断开延迟2秒后重试推流 -void rePushDelay(const string &schema, +void rePushDelay(const EventPoller::Ptr &poller, + const string &schema, const string &vhost, const string &app, const string &stream, @@ -52,7 +55,8 @@ void rePushDelay(const string &schema, const string &url) ; //创建推流器并开始推流 -void createPusher(const string &schema, +void createPusher(const EventPoller::Ptr &poller, + const string &schema, const string &vhost, const string &app, const string &stream, @@ -67,22 +71,22 @@ void createPusher(const string &schema, } //创建推流器并绑定一个MediaSource - pusher.reset(new MediaPusher(src)); + pusher.reset(new MediaPusher(src,poller)); //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp // (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 - pusher->setOnShutdown([schema,vhost,app,stream,filePath, url](const SockException &ex) { + pusher->setOnShutdown([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重新推流 - rePushDelay(schema,vhost,app, stream,filePath, url); + rePushDelay(poller,schema,vhost,app, stream,filePath, url); }); //设置发布结果处理逻辑 - pusher->setOnPublished([schema,vhost,app,stream,filePath, url](const SockException &ex) { + pusher->setOnPublished([poller,schema,vhost,app,stream,filePath, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(schema,vhost,app, stream, filePath ,url); + rePushDelay(poller,schema,vhost,app, stream, filePath ,url); }else { InfoL << "Publish success,Please play with player:" << url; } @@ -90,21 +94,21 @@ void createPusher(const string &schema, pusher->publish(url); } -Timer::Ptr g_timer; //推流失败或断开延迟2秒后重试推流 -void rePushDelay(const string &schema, +void rePushDelay(const EventPoller::Ptr &poller, + const string &schema, const string &vhost, const string &app, const string &stream, const string &filePath, const string &url) { - g_timer = std::make_shared(2,[schema,vhost,app, stream, filePath,url]() { + g_timer = std::make_shared(2,[poller,schema,vhost,app, stream, filePath,url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(schema,vhost,app, stream, filePath,url); + createPusher(poller,schema,vhost,app, stream, filePath,url); //此任务不重复 return false; - }, nullptr); + }, poller); } //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 @@ -112,8 +116,9 @@ int domain(const string & filePath,const string & pushUrl){ //设置日志 Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); - //vhost/app/stream可以随便自己填,现在不限制app应用名了 - createPusher(FindField(pushUrl.data(), nullptr,"://"),DEFAULT_VHOST,"live","stream",filePath,pushUrl); + auto poller = EventPollerPool::Instance().getPoller(); + //vhost/app/stream可以随便自己填,现在不限制app应用名了 + createPusher(poller,FindField(pushUrl.data(), nullptr,"://"),DEFAULT_VHOST,"live","stream",filePath,pushUrl); //设置退出信号处理函数 static semaphore sem;