简化rtmp推流器代码

This commit is contained in:
xiongziliang 2019-03-26 18:04:06 +08:00
parent dc1cae2153
commit b5c3830a63
3 changed files with 12 additions and 19 deletions

@ -1 +1 @@
Subproject commit e589ef61db16b3d7c43661f1e421b451227af07e Subproject commit 8e90a8dbefe9060fdb86b9dc8036345aa69faf41

View File

@ -35,25 +35,15 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
unordered_map<string, RtmpPusher::rtmpCMDHandle> RtmpPusher::g_mapCmd;
RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) { RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) {
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream)); auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream));
if (!src) { if (!src) {
auto strErr = StrPrinter << "media source:" << strVhost << "/" << strApp << "/" << strStream << "not found!" << endl; auto strErr = StrPrinter << "media source:" << strVhost << "/" << strApp << "/" << strStream << "not found!" << endl;
throw std::runtime_error(strErr); throw std::runtime_error(strErr);
} }
init(src); _pMediaSrc=src;
} }
RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){ RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){
init(src);
}
void RtmpPusher::init(const RtmpMediaSource::Ptr &src){
static onceToken token([]() {
g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
}, []() {});
_pMediaSrc=src; _pMediaSrc=src;
} }
@ -266,6 +256,14 @@ void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) {
switch (chunkData.typeId) { switch (chunkData.typeId) {
case MSG_CMD: case MSG_CMD:
case MSG_CMD3: { case MSG_CMD3: {
typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec);
static unordered_map<string, rtmpCMDHandle> g_mapCmd;
static onceToken token([]() {
g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
}, []() {});
AMFDecoder dec(chunkData.strBuf, 0); AMFDecoder dec(chunkData.strBuf, 0);
std::string type = dec.load<std::string>(); std::string type = dec.load<std::string>();
auto it = g_mapCmd.find(type); auto it = g_mapCmd.find(type);

View File

@ -64,7 +64,6 @@ protected:
send(buffer); send(buffer);
} }
private: private:
void init(const RtmpMediaSource::Ptr &src);
void onShutdown(const SockException &ex) { void onShutdown(const SockException &ex) {
_pPublishTimer.reset(); _pPublishTimer.reset();
if(_onShutdown){ if(_onShutdown){
@ -99,6 +98,7 @@ private:
inline void send_publish(); inline void send_publish();
inline void send_metaData(); inline void send_metaData();
private:
string _strApp; string _strApp;
string _strStream; string _strStream;
string _strTcUrl; string _strTcUrl;
@ -107,13 +107,8 @@ private:
recursive_mutex _mtxOnResultCB; recursive_mutex _mtxOnResultCB;
deque<function<void(AMFValue &dec)> > _dqOnStatusCB; deque<function<void(AMFValue &dec)> > _dqOnStatusCB;
recursive_mutex _mtxOnStatusCB; recursive_mutex _mtxOnStatusCB;
typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec);
static unordered_map<string, rtmpCMDHandle> g_mapCmd;
//超时功能实现 //超时功能实现
std::shared_ptr<Timer> _pPublishTimer; std::shared_ptr<Timer> _pPublishTimer;
//源 //源
std::weak_ptr<RtmpMediaSource> _pMediaSrc; std::weak_ptr<RtmpMediaSource> _pMediaSrc;
RtmpMediaSource::RingType::RingReader::Ptr _pRtmpReader; RtmpMediaSource::RingType::RingReader::Ptr _pRtmpReader;