MediaPusher: 抽象精简代码

This commit is contained in:
ziyue 2021-11-09 16:46:38 +08:00
parent b96a2291eb
commit dad0c5e3cc
6 changed files with 58 additions and 70 deletions

View File

@ -30,22 +30,22 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller,
string prefix = FindField(strUrl.data(), NULL, "://"); string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsps",prefix.data()) == 0) { if (strcasecmp("rtsps",prefix.data()) == 0) {
return PusherBase::Ptr(new TcpClientWithSSL<RtspPusher>(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher); return PusherBase::Ptr(new TcpClientWithSSL<RtspPusherImp>(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
} }
if (strcasecmp("rtsp",prefix.data()) == 0) { if (strcasecmp("rtsp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher); return PusherBase::Ptr(new RtspPusherImp(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
} }
if (strcasecmp("rtmps",prefix.data()) == 0) { if (strcasecmp("rtmps",prefix.data()) == 0) {
return PusherBase::Ptr(new TcpClientWithSSL<RtmpPusher>(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher); return PusherBase::Ptr(new TcpClientWithSSL<RtmpPusherImp>(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
} }
if (strcasecmp("rtmp",prefix.data()) == 0) { if (strcasecmp("rtmp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher); return PusherBase::Ptr(new RtmpPusherImp(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
} }
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher); return PusherBase::Ptr(new RtspPusherImp(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
} }
PusherBase::PusherBase() { PusherBase::PusherBase() {

View File

@ -38,12 +38,12 @@ public:
* *
* @param strUrl urlrtsp/rtmp * @param strUrl urlrtsp/rtmp
*/ */
virtual void publish(const string &strUrl) = 0; virtual void publish(const string &strUrl) {};
/** /**
* *
*/ */
virtual void teardown() = 0; virtual void teardown() {};
/** /**
* *
@ -54,6 +54,10 @@ public:
* *
*/ */
virtual void setOnShutdown(const Event &cb) = 0; virtual void setOnShutdown(const Event &cb) = 0;
protected:
virtual void onShutdown(const SockException &ex) = 0;
virtual void onPublishResult(const SockException &ex) = 0;
}; };
template<typename Parent, typename Delegate> template<typename Parent, typename Delegate>
@ -67,21 +71,21 @@ public:
/** /**
* *
* @param strUrl urlrtsp/rtmp * @param url urlrtsp/rtmp
*/ */
void publish(const string &strUrl) override { void publish(const string &url) override {
if (_delegate) { return _delegate ? _delegate->publish(url) : Parent::publish(url);
_delegate->publish(strUrl);
}
} }
/** /**
* *
*/ */
void teardown() override { void teardown() override {
if (_delegate) { return _delegate ? _delegate->teardown() : Parent::teardown();
_delegate->teardown(); }
}
std::shared_ptr<SockInfo> getSockInfo() const {
return dynamic_pointer_cast<SockInfo>(_delegate);
} }
/** /**
@ -104,8 +108,19 @@ public:
_on_shutdown = cb; _on_shutdown = cb;
} }
std::shared_ptr<SockInfo> getSockInfo() const { protected:
return dynamic_pointer_cast<SockInfo>(_delegate); void onShutdown(const SockException &ex) override {
if (_on_shutdown) {
_on_shutdown(ex);
_on_shutdown = nullptr;
}
}
void onPublishResult(const SockException &ex) override {
if (_on_publish) {
_on_publish(ex);
_on_publish = nullptr;
}
} }
protected: protected:

View File

@ -46,7 +46,7 @@ void RtmpPusher::teardown() {
} }
} }
void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) { void RtmpPusher::onPublishResult_l(const SockException &ex, bool handshake_done) {
DebugL << ex.what(); DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) { if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调 //主动shutdown的不触发回调
@ -55,14 +55,10 @@ void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) {
if (!handshake_done) { if (!handshake_done) {
//播放结果回调 //播放结果回调
_publish_timer.reset(); _publish_timer.reset();
if (_on_published) { onPublishResult(ex);
_on_published(ex);
}
} else { } else {
//播放成功后异常断开回调 //播放成功后异常断开回调
if (_on_shutdown) { onShutdown(ex);
_on_shutdown(ex);
}
} }
if (ex) { if (ex) {
@ -78,7 +74,7 @@ void RtmpPusher::publish(const string &url) {
_tc_url = string("rtmp://") + host_url + "/" + _app; _tc_url = string("rtmp://") + host_url + "/" + _app;
if (!_app.size() || !_stream_id.size()) { if (!_app.size() || !_stream_id.size()) {
onPublishResult(SockException(Err_other, "rtmp url非法"), false); onPublishResult_l(SockException(Err_other, "rtmp url非法"), false);
return; return;
} }
DebugL << host_url << " " << _app << " " << _stream_id; DebugL << host_url << " " << _app << " " << _stream_id;
@ -99,7 +95,7 @@ void RtmpPusher::publish(const string &url) {
if (!strongSelf) { if (!strongSelf) {
return false; return false;
} }
strongSelf->onPublishResult(SockException(Err_timeout, "publish rtmp timeout"), false); strongSelf->onPublishResult_l(SockException(Err_timeout, "publish rtmp timeout"), false);
return false; return false;
}, getPoller())); }, getPoller()));
@ -112,12 +108,12 @@ void RtmpPusher::publish(const string &url) {
void RtmpPusher::onErr(const SockException &ex){ void RtmpPusher::onErr(const SockException &ex){
//定时器_pPublishTimer为空后表明握手结束了 //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex, !_publish_timer); onPublishResult_l(ex, !_publish_timer);
} }
void RtmpPusher::onConnect(const SockException &err){ void RtmpPusher::onConnect(const SockException &err){
if (err) { if (err) {
onPublishResult(err, false); onPublishResult_l(err, false);
return; return;
} }
weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
@ -138,7 +134,7 @@ void RtmpPusher::onRecv(const Buffer::Ptr &buf){
} catch (exception &e) { } catch (exception &e) {
SockException ex(Err_other, e.what()); SockException ex(Err_other, e.what());
//定时器_pPublishTimer为空后表明握手结束了 //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex, !_publish_timer); onPublishResult_l(ex, !_publish_timer);
} }
} }
@ -226,10 +222,10 @@ inline void RtmpPusher::send_metaData(){
_rtmp_reader->setDetachCB([weak_self]() { _rtmp_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); strong_self->onPublishResult_l(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer);
} }
}); });
onPublishResult(SockException(Err_success, "success"), false); onPublishResult_l(SockException(Err_success, "success"), false);
//提升发送性能 //提升发送性能
setSocketFlags(); setSocketFlags();
} }

View File

@ -27,14 +27,6 @@ public:
void publish(const string &url) override ; void publish(const string &url) override ;
void teardown() override; void teardown() override;
void setOnPublished(const Event &cb) override {
_on_published = cb;
}
void setOnShutdown(const Event &cb) override{
_on_shutdown = cb;
}
protected: protected:
//for Tcpclient override //for Tcpclient override
void onRecv(const Buffer::Ptr &buf) override; void onRecv(const Buffer::Ptr &buf) override;
@ -48,7 +40,7 @@ protected:
} }
private: private:
void onPublishResult(const SockException &ex, bool handshake_done); void onPublishResult_l(const SockException &ex, bool handshake_done);
template<typename FUN> template<typename FUN>
inline void addOnResultCB(const FUN &fun) { inline void addOnResultCB(const FUN &fun) {
@ -81,16 +73,14 @@ private:
deque<function<void(AMFValue &dec)> > _deque_on_status; deque<function<void(AMFValue &dec)> > _deque_on_status;
unordered_map<int, function<void(AMFDecoder &dec)> > _map_on_result; unordered_map<int, function<void(AMFDecoder &dec)> > _map_on_result;
//事件监听
Event _on_shutdown;
Event _on_published;
//推流超时定时器 //推流超时定时器
std::shared_ptr<Timer> _publish_timer; std::shared_ptr<Timer> _publish_timer;
std::weak_ptr<RtmpMediaSource> _publish_src; std::weak_ptr<RtmpMediaSource> _publish_src;
RtmpMediaSource::RingType::RingReader::Ptr _rtmp_reader; RtmpMediaSource::RingType::RingReader::Ptr _rtmp_reader;
}; };
using RtmpPusherImp = PusherImp<RtmpPusher, PusherBase>;
} /* namespace mediakit */ } /* namespace mediakit */
#endif /* SRC_RTMP_RTMPPUSHER_H_ */ #endif /* SRC_RTMP_RTMPPUSHER_H_ */

View File

@ -57,7 +57,7 @@ void RtspPusher::teardown() {
void RtspPusher::publish(const string &url_str) { void RtspPusher::publish(const string &url_str) {
RtspUrl url; RtspUrl url;
if (!url.parse(url_str)) { if (!url.parse(url_str)) {
onPublishResult(SockException(Err_other, StrPrinter << "illegal rtsp url:" << url_str), false); onPublishResult_l(SockException(Err_other, StrPrinter << "illegal rtsp url:" << url_str), false);
return; return;
} }
@ -83,7 +83,7 @@ void RtspPusher::publish(const string &url_str) {
if (!strong_self) { if (!strong_self) {
return false; return false;
} }
strong_self->onPublishResult(SockException(Err_timeout, "publish rtsp timeout"), false); strong_self->onPublishResult_l(SockException(Err_timeout, "publish rtsp timeout"), false);
return false; return false;
}, getPoller())); }, getPoller()));
@ -94,7 +94,7 @@ void RtspPusher::publish(const string &url_str) {
startConnect(url._host, url._port, publish_timeout_sec); startConnect(url._host, url._port, publish_timeout_sec);
} }
void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) { void RtspPusher::onPublishResult_l(const SockException &ex, bool handshake_done) {
DebugL << ex.what(); DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) { if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调 //主动shutdown的不触发回调
@ -103,14 +103,10 @@ void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) {
if (!handshake_done) { if (!handshake_done) {
//播放结果回调 //播放结果回调
_publish_timer.reset(); _publish_timer.reset();
if (_on_published) { onPublishResult(ex);
_on_published(ex);
}
} else { } else {
//播放成功后异常断开回调 //播放成功后异常断开回调
if (_on_shutdown) { onShutdown(ex);
_on_shutdown(ex);
}
} }
if (ex) { if (ex) {
@ -120,12 +116,12 @@ void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) {
void RtspPusher::onErr(const SockException &ex) { void RtspPusher::onErr(const SockException &ex) {
//定时器_pPublishTimer为空后表明握手结束了 //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex, !_publish_timer); onPublishResult_l(ex, !_publish_timer);
} }
void RtspPusher::onConnect(const SockException &err) { void RtspPusher::onConnect(const SockException &err) {
if (err) { if (err) {
onPublishResult(err, false); onPublishResult_l(err, false);
return; return;
} }
sendAnnounce(); sendAnnounce();
@ -137,7 +133,7 @@ void RtspPusher::onRecv(const Buffer::Ptr &buf){
} catch (exception &e) { } catch (exception &e) {
SockException ex(Err_other, e.what()); SockException ex(Err_other, e.what());
//定时器_pPublishTimer为空后表明握手结束了 //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex, !_publish_timer); onPublishResult_l(ex, !_publish_timer);
} }
} }
@ -465,7 +461,7 @@ void RtspPusher::sendRecord() {
_rtsp_reader->setDetachCB([weak_self]() { _rtsp_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer); strong_self->onPublishResult_l(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer);
} }
}); });
if (_rtp_type != Rtsp::RTP_TCP) { if (_rtp_type != Rtsp::RTP_TCP) {
@ -479,7 +475,7 @@ void RtspPusher::sendRecord() {
return true; return true;
}, getPoller())); }, getPoller()));
} }
onPublishResult(SockException(Err_success, "success"), false); onPublishResult_l(SockException(Err_success, "success"), false);
//提升发送性能 //提升发送性能
setSocketFlags(); setSocketFlags();
}; };

View File

@ -36,14 +36,6 @@ public:
void publish(const string &url) override; void publish(const string &url) override;
void teardown() override; void teardown() override;
void setOnPublished(const Event &cb) override {
_on_published = cb;
}
void setOnShutdown(const Event & cb) override{
_on_shutdown = cb;
}
protected: protected:
//for Tcpclient override //for Tcpclient override
void onRecv(const Buffer::Ptr &buf) override; void onRecv(const Buffer::Ptr &buf) override;
@ -57,7 +49,7 @@ protected:
virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data, size_t len); virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data, size_t len);
private: private:
void onPublishResult(const SockException &ex, bool handshake_done); void onPublishResult_l(const SockException &ex, bool handshake_done);
void sendAnnounce(); void sendAnnounce();
void sendSetup(unsigned int track_idx); void sendSetup(unsigned int track_idx);
@ -102,9 +94,6 @@ private:
std::shared_ptr<Timer> _beat_timer; std::shared_ptr<Timer> _beat_timer;
std::weak_ptr<RtspMediaSource> _push_src; std::weak_ptr<RtspMediaSource> _push_src;
RtspMediaSource::RingType::RingReader::Ptr _rtsp_reader; RtspMediaSource::RingType::RingReader::Ptr _rtsp_reader;
//事件监听
Event _on_shutdown;
Event _on_published;
function<void(const Parser&)> _on_res_func; function<void(const Parser&)> _on_res_func;
////////// rtcp //////////////// ////////// rtcp ////////////////
//rtcp发送时间,trackid idx 为数组下标 //rtcp发送时间,trackid idx 为数组下标
@ -113,5 +102,7 @@ private:
vector<RtcpContext::Ptr> _rtcp_context; vector<RtcpContext::Ptr> _rtcp_context;
}; };
using RtspPusherImp = PusherImp<RtspPusher, PusherBase>;
} /* namespace mediakit */ } /* namespace mediakit */
#endif //ZLMEDIAKIT_RTSPPUSHER_H #endif //ZLMEDIAKIT_RTSPPUSHER_H