diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 50bcbff4..8e120c6f 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -214,7 +214,7 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data){ assert(ctx && dst_url && ssrc); MediaSource *src = (MediaSource *)ctx; - src->startSendRtp(dst_url, dst_port, ssrc, is_udp, [cb, user_data](const SockException &ex){ + src->startSendRtp(dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ if (cb) { cb(user_data, ex.getErrCode(), ex.what()); } @@ -224,7 +224,7 @@ API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ct API_EXPORT int API_CALL mk_media_source_stop_send_rtp(const mk_media_source ctx){ assert(ctx); MediaSource *src = (MediaSource *) ctx; - return src->stopSendRtp(); + return src->stopSendRtp(""); } API_EXPORT void API_CALL mk_media_source_find(const char *schema, diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 53fb024e..d2d4dcc3 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -193,7 +193,7 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; //sender参数无用 - (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, [cb, user_data](const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ if (cb) { cb(user_data, ex.getErrCode(), ex.what()); } @@ -204,5 +204,5 @@ API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx){ assert(ctx); MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; //sender参数无用 - return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1); + return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1, ""); } \ No newline at end of file diff --git a/api/source/mk_thread.cpp b/api/source/mk_thread.cpp index 08778287..fca1cec6 100644 --- a/api/source/mk_thread.cpp +++ b/api/source/mk_thread.cpp @@ -77,7 +77,7 @@ public: _task->cancel(); } - void start(int ms ,EventPoller &poller){ + void start(uint64_t ms ,EventPoller &poller){ weak_ptr weak_self = shared_from_this(); _task = poller.doDelayTask(ms, [weak_self]() { auto strong_self = weak_self.lock(); diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 768b2e64..02a4030f 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -850,7 +850,8 @@ void installWebApi() { throw ApiRetException("该媒体流不存在", API::OtherFailed); } - src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], [val, headerOut, invoker](const SockException &ex){ + //src_port为空时,则随机本地端口 + src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], allArgs["src_port"], [val, headerOut, invoker](const SockException &ex){ if (ex) { const_cast(val)["code"] = API::OtherFailed; const_cast(val)["msg"] = ex.what(); @@ -868,7 +869,8 @@ void installWebApi() { throw ApiRetException("该媒体流不存在", API::OtherFailed); } - if (!src->stopSendRtp()) { + //ssrc如果为空,关闭全部 + if (!src->stopSendRtp(allArgs["ssrc"])) { throw ApiRetException("尚未开始推流,停止失败", API::OtherFailed); } }); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index afe42ff7..966620f7 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -183,21 +183,21 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (!listener) { cb(SockException(Err_other, "尚未设置事件监听器")); return; } - return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb); + return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, src_port, cb); } -bool MediaSource::stopSendRtp() { +bool MediaSource::stopSendRtp(const string &ssrc) { auto listener = _listener.lock(); if (!listener) { return false; } - return listener->stopSendRtp(*this); + return listener->stopSendRtp(*this, ssrc); } void MediaSource::for_each_media(const function &cb) { @@ -642,19 +642,19 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (listener) { - listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb); } else { - MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb); } } -bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){ +bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender, const string &ssrc){ auto listener = _listener.lock(); if (listener) { - return listener->stopSendRtp(sender); + return listener->stopSendRtp(sender, ssrc); } return false; } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 965e1e57..0095ee22 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -83,9 +83,9 @@ public: // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) { cb(SockException(Err_other, "not implemented"));}; // 停止发送ps-rtp - virtual bool stopSendRtp(MediaSource &sender) {return false; } + virtual bool stopSendRtp(MediaSource &sender, const string &ssrc) {return false; } private: Timer::Ptr _async_close_timer; @@ -112,8 +112,8 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; - bool stopSendRtp(MediaSource &sender) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; + bool stopSendRtp(MediaSource &sender, const string &ssrc) override; private: std::weak_ptr _listener; @@ -256,9 +256,9 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb); + void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb); // 停止发送ps-rtp - bool stopSendRtp(); + bool stopSendRtp(const string &ssrc); ////////////////static方法,查找或生成MediaSource//////////////// diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index f3d44352..e9db150f 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -329,11 +329,11 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ #if defined(ENABLE_RTPPROXY) RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); - rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) { + rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](const SockException &ex) { cb(ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { @@ -343,21 +343,29 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ rtp_sender->addTrack(track); } rtp_sender->addTrackCompleted(); - strong_self->_rtp_sender = rtp_sender; + lock_guard lck(strong_self->_rtp_sender_mtx); + strong_self->_rtp_sender[ssrc] = rtp_sender; }); #else cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); #endif//ENABLE_RTPPROXY } -bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ +bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){ #if defined(ENABLE_RTPPROXY) - if (_rtp_sender) { - _rtp_sender = nullptr; - return true; + if (ssrc.empty()) { + //关闭全部 + lock_guard lck(_rtp_sender_mtx); + auto size = _rtp_sender.size(); + _rtp_sender.clear(); + return size; } -#endif//ENABLE_RTPPROXY + //关闭特定的 + lock_guard lck(_rtp_sender_mtx); + return _rtp_sender.erase(ssrc); +#else return false; +#endif//ENABLE_RTPPROXY } void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { @@ -442,9 +450,9 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { _muxer->inputFrame(frame); #if defined(ENABLE_RTPPROXY) - auto rtp_sender = _rtp_sender; - if (rtp_sender) { - rtp_sender->inputFrame(frame); + lock_guard lck(_rtp_sender_mtx); + for (auto &pr : _rtp_sender) { + pr.second->inputFrame(frame); } #endif //ENABLE_RTPPROXY @@ -456,7 +464,7 @@ bool MultiMediaSourceMuxer::isEnabled(){ //无人观看时,每次检查是否真的无人观看 //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) #if defined(ENABLE_RTPPROXY) - _is_enable = (_muxer->isEnabled() || _rtp_sender); + _is_enable = (_muxer->isEnabled() || _rtp_sender.size()); #else _is_enable = _muxer->isEnabled(); #endif //ENABLE_RTPPROXY diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 36729447..2ffe8163 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -142,13 +142,13 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; /** * 停止ps-rtp发送 * @return 是否成功 */ - bool stopSendRtp(MediaSource &sender) override; + bool stopSendRtp(MediaSource &sender, const string &ssrc) override; /////////////////////////////////MediaSinkInterface override///////////////////////////////// @@ -189,7 +189,8 @@ private: MultiMuxerPrivate::Ptr _muxer; std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) - RtpSender::Ptr _rtp_sender; + mutex _rtp_sender_mtx; + unordered_map _rtp_sender; #endif //ENABLE_RTPPROXY }; diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 8fdb531a..50710a1e 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -26,14 +26,15 @@ RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { RtpSender::~RtpSender() { } -void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ +void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb){ _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; _dst_port = dst_port; + _src_port = src_port; weak_ptr weak_self = shared_from_this(); if (is_udp) { - _socket->bindUdpSock(0); + _socket->bindUdpSock(src_port); auto poller = _poller; WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() { struct sockaddr addr; @@ -65,7 +66,7 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, //tcp连接成功 strong_self->onConnect(); } - }); + }, 5.0F, "0.0.0.0", src_port); } } @@ -149,7 +150,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { if (!strong_self) { return false; } - strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){ + strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](const SockException &ex){ auto strong_self = weak_self.lock(); if (strong_self && ex) { //连接失败且本对象未销毁,那么重试连接 diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 86a58894..840792f7 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -37,7 +37,7 @@ public: * @param is_udp 是否采用udp方式发送rtp * @param cb 连接目标端口是否成功的回调 */ - void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb); + void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb); /** * 输入帧数据 @@ -74,6 +74,7 @@ private: bool _is_connect = false; string _dst_url; uint16_t _dst_port; + uint16_t _src_port; Socket::Ptr _socket; EventPoller::Ptr _poller; Timer::Ptr _connect_timer;