2017-10-09 22:11:01 +08:00
|
|
|
|
/*
|
2017-09-27 16:20:30 +08:00
|
|
|
|
* MIT License
|
2017-04-01 16:35:56 +08:00
|
|
|
|
*
|
2017-09-27 16:20:30 +08:00
|
|
|
|
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
|
|
|
|
|
*
|
|
|
|
|
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
|
|
|
|
|
*
|
|
|
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
|
|
* of this software and associated documentation files (the "Software"), to deal
|
|
|
|
|
* in the Software without restriction, including without limitation the rights
|
|
|
|
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
|
|
* copies of the Software, and to permit persons to whom the Software is
|
|
|
|
|
* furnished to do so, subject to the following conditions:
|
|
|
|
|
*
|
|
|
|
|
* The above copyright notice and this permission notice shall be included in all
|
|
|
|
|
* copies or substantial portions of the Software.
|
|
|
|
|
*
|
|
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
|
|
* SOFTWARE.
|
2017-04-01 16:35:56 +08:00
|
|
|
|
*/
|
|
|
|
|
#include "RtmpPusher.h"
|
|
|
|
|
#include "Rtmp/utils.h"
|
|
|
|
|
#include "Rtsp/Rtsp.h"
|
2017-04-25 11:35:41 +08:00
|
|
|
|
#include "Util/util.h"
|
|
|
|
|
#include "Util/onceToken.h"
|
|
|
|
|
#include "Thread/ThreadPool.h"
|
2017-04-01 16:35:56 +08:00
|
|
|
|
|
|
|
|
|
using namespace ZL::Util;
|
|
|
|
|
|
|
|
|
|
namespace ZL {
|
|
|
|
|
namespace Rtmp {
|
|
|
|
|
|
|
|
|
|
unordered_map<string, RtmpPusher::rtmpCMDHandle> RtmpPusher::g_mapCmd;
|
|
|
|
|
RtmpPusher::RtmpPusher(const char *strApp,const char *strStream) {
|
|
|
|
|
auto src = RtmpMediaSource::find(strApp,strStream);
|
|
|
|
|
if (!src) {
|
2017-08-09 18:39:30 +08:00
|
|
|
|
auto strErr = StrPrinter << "media source:" << strApp << "/" << strStream << "not found!" << endl;
|
2017-04-01 16:35:56 +08:00
|
|
|
|
throw std::runtime_error(strErr);
|
|
|
|
|
}
|
2017-09-30 13:00:12 +08:00
|
|
|
|
init(src);
|
|
|
|
|
}
|
|
|
|
|
RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){
|
|
|
|
|
init(src);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RtmpPusher::init(const RtmpMediaSource::Ptr &src){
|
|
|
|
|
static onceToken token([]() {
|
|
|
|
|
g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
|
|
|
|
|
g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
|
|
|
|
|
g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
|
|
|
|
|
}, []() {});
|
|
|
|
|
m_pMediaSrc=src;
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RtmpPusher::~RtmpPusher() {
|
|
|
|
|
teardown();
|
|
|
|
|
DebugL << endl;
|
|
|
|
|
}
|
|
|
|
|
void RtmpPusher::teardown() {
|
|
|
|
|
if (alive()) {
|
|
|
|
|
m_strApp.clear();
|
|
|
|
|
m_strStream.clear();
|
|
|
|
|
m_strTcUrl.clear();
|
2017-05-12 13:59:58 +08:00
|
|
|
|
{
|
|
|
|
|
lock_guard<recursive_mutex> lck(m_mtxOnResultCB);
|
|
|
|
|
m_mapOnResultCB.clear();
|
|
|
|
|
}
|
2017-04-01 16:35:56 +08:00
|
|
|
|
{
|
2017-05-12 13:59:58 +08:00
|
|
|
|
lock_guard<recursive_mutex> lck(m_mtxOnStatusCB);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
m_dqOnStatusCB.clear();
|
|
|
|
|
}
|
2017-06-06 20:06:31 +08:00
|
|
|
|
m_pPublishTimer.reset();
|
2017-08-03 13:55:46 +08:00
|
|
|
|
reset();
|
2017-04-01 16:35:56 +08:00
|
|
|
|
shutdown();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RtmpPusher::publish(const char* strUrl) {
|
|
|
|
|
teardown();
|
|
|
|
|
string strHost = FindField(strUrl, "://", "/");
|
|
|
|
|
m_strApp = FindField(strUrl, (strHost + "/").data(), "/");
|
|
|
|
|
m_strStream = FindField(strUrl, (strHost + "/" + m_strApp + "/").data(), NULL);
|
|
|
|
|
m_strTcUrl = string("rtmp://") + strHost + "/" + m_strApp;
|
|
|
|
|
|
|
|
|
|
if (!m_strApp.size() || !m_strStream.size()) {
|
2017-06-06 20:06:31 +08:00
|
|
|
|
onPublishResult(SockException(Err_other,"rtmp url非法"));
|
2017-04-01 16:35:56 +08:00
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
DebugL << strHost << " " << m_strApp << " " << m_strStream;
|
|
|
|
|
|
|
|
|
|
auto iPort = atoi(FindField(strHost.c_str(), ":", NULL).c_str());
|
|
|
|
|
if (iPort <= 0) {
|
|
|
|
|
//rtmp 默认端口1935
|
|
|
|
|
iPort = 1935;
|
|
|
|
|
} else {
|
|
|
|
|
//服务器域名
|
|
|
|
|
strHost = FindField(strHost.c_str(), NULL, ":");
|
|
|
|
|
}
|
|
|
|
|
startConnect(strHost, iPort);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RtmpPusher::onErr(const SockException &ex){
|
2017-06-06 20:06:31 +08:00
|
|
|
|
onShutdown(ex);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
void RtmpPusher::onConnect(const SockException &err){
|
|
|
|
|
if(err.getErrCode()!=Err_success) {
|
2017-06-06 20:06:31 +08:00
|
|
|
|
onPublishResult(err);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
|
2017-06-06 20:06:31 +08:00
|
|
|
|
m_pPublishTimer.reset( new Timer(10, [weakSelf]() {
|
2017-04-01 16:35:56 +08:00
|
|
|
|
auto strongSelf=weakSelf.lock();
|
|
|
|
|
if(!strongSelf) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2017-06-06 20:06:31 +08:00
|
|
|
|
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"));
|
2017-04-01 16:35:56 +08:00
|
|
|
|
strongSelf->teardown();
|
|
|
|
|
return false;
|
|
|
|
|
}));
|
|
|
|
|
startClientSession([weakSelf](){
|
|
|
|
|
auto strongSelf=weakSelf.lock();
|
|
|
|
|
if(!strongSelf) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
//strongSelf->sendChunkSize(60000);
|
|
|
|
|
strongSelf->send_connect();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
void RtmpPusher::onRecv(const Socket::Buffer::Ptr &pBuf){
|
|
|
|
|
try {
|
|
|
|
|
onParseRtmp(pBuf->data(), pBuf->size());
|
|
|
|
|
} catch (exception &e) {
|
|
|
|
|
SockException ex(Err_other, e.what());
|
2017-06-06 20:06:31 +08:00
|
|
|
|
onPublishResult(ex);
|
|
|
|
|
onShutdown(ex);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
teardown();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
inline void RtmpPusher::send_connect() {
|
|
|
|
|
AMFValue obj(AMF_OBJECT);
|
|
|
|
|
obj.set("app", m_strApp);
|
|
|
|
|
obj.set("type", "nonprivate");
|
|
|
|
|
obj.set("tcUrl", m_strTcUrl);
|
|
|
|
|
obj.set("swfUrl", m_strTcUrl);
|
|
|
|
|
sendInvoke("connect", obj);
|
|
|
|
|
addOnResultCB([this](AMFDecoder &dec){
|
|
|
|
|
//TraceL << "connect result";
|
|
|
|
|
dec.load<AMFValue>();
|
|
|
|
|
auto val = dec.load<AMFValue>();
|
|
|
|
|
auto level = val["level"].as_string();
|
|
|
|
|
auto code = val["code"].as_string();
|
|
|
|
|
if(level != "status"){
|
2017-08-09 18:39:30 +08:00
|
|
|
|
throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
send_createStream();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline void RtmpPusher::send_createStream() {
|
|
|
|
|
AMFValue obj(AMF_NULL);
|
|
|
|
|
sendInvoke("createStream", obj);
|
|
|
|
|
addOnResultCB([this](AMFDecoder &dec){
|
|
|
|
|
//TraceL << "createStream result";
|
|
|
|
|
dec.load<AMFValue>();
|
|
|
|
|
m_ui32StreamId = dec.load<int>();
|
|
|
|
|
send_publish();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
inline void RtmpPusher::send_publish() {
|
|
|
|
|
AMFEncoder enc;
|
|
|
|
|
enc << "publish" << ++m_iReqID << nullptr << m_strStream << m_strApp ;
|
|
|
|
|
sendRequest(MSG_CMD, enc.data());
|
|
|
|
|
|
|
|
|
|
addOnStatusCB([this](AMFValue &val) {
|
|
|
|
|
auto level = val["level"].as_string();
|
|
|
|
|
auto code = val["code"].as_string();
|
|
|
|
|
if(level != "status") {
|
2017-08-09 18:39:30 +08:00
|
|
|
|
throw std::runtime_error(StrPrinter <<"publish 失败:" << level << " " << code << endl);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
//start send media
|
|
|
|
|
send_metaData();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline void RtmpPusher::send_metaData(){
|
|
|
|
|
auto src = m_pMediaSrc.lock();
|
|
|
|
|
if (!src) {
|
2017-08-09 18:39:30 +08:00
|
|
|
|
throw std::runtime_error("the media source was released");
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
if (!src->ready()) {
|
2017-08-09 18:39:30 +08:00
|
|
|
|
throw std::runtime_error("the media source is not ready");
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
AMFEncoder enc;
|
|
|
|
|
enc << "@setDataFrame" << "onMetaData" << src->getMetaData();
|
|
|
|
|
sendRequest(MSG_DATA, enc.data());
|
|
|
|
|
|
|
|
|
|
src->getConfigFrame([&](const RtmpPacket &pkt){
|
|
|
|
|
sendRtmp(pkt.typeId, m_ui32StreamId, pkt.strBuf, pkt.timeStamp, pkt.chunkId);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
m_pRtmpReader = src->getRing()->attach();
|
|
|
|
|
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
|
|
|
|
|
m_pRtmpReader->setReadCB([weakSelf](const RtmpPacket &pkt){
|
|
|
|
|
auto strongSelf = weakSelf.lock();
|
|
|
|
|
if(!strongSelf) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
strongSelf->sendRtmp(pkt.typeId, strongSelf->m_ui32StreamId, pkt.strBuf, pkt.timeStamp, pkt.chunkId);
|
|
|
|
|
});
|
|
|
|
|
m_pRtmpReader->setDetachCB([weakSelf](){
|
|
|
|
|
auto strongSelf = weakSelf.lock();
|
|
|
|
|
if(strongSelf){
|
2017-06-06 20:06:31 +08:00
|
|
|
|
strongSelf->onShutdown(SockException(Err_other,"媒体源被释放"));
|
2017-04-01 16:35:56 +08:00
|
|
|
|
strongSelf->teardown();
|
|
|
|
|
}
|
|
|
|
|
});
|
2017-06-06 20:06:31 +08:00
|
|
|
|
onPublishResult(SockException(Err_success,"success"));
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
void RtmpPusher::onCmd_result(AMFDecoder &dec){
|
|
|
|
|
auto iReqId = dec.load<int>();
|
2017-05-12 13:59:58 +08:00
|
|
|
|
lock_guard<recursive_mutex> lck(m_mtxOnResultCB);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
auto it = m_mapOnResultCB.find(iReqId);
|
|
|
|
|
if(it != m_mapOnResultCB.end()){
|
|
|
|
|
it->second(dec);
|
|
|
|
|
m_mapOnResultCB.erase(it);
|
|
|
|
|
}else{
|
|
|
|
|
WarnL << "unhandled _result";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) {
|
|
|
|
|
AMFValue val;
|
|
|
|
|
while(true){
|
|
|
|
|
val = dec.load<AMFValue>();
|
|
|
|
|
if(val.type() == AMF_OBJECT){
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(val.type() != AMF_OBJECT){
|
2017-08-09 18:39:30 +08:00
|
|
|
|
throw std::runtime_error("onStatus:the result object was not found");
|
2017-04-01 16:35:56 +08:00
|
|
|
|
}
|
|
|
|
|
|
2017-05-12 13:59:58 +08:00
|
|
|
|
lock_guard<recursive_mutex> lck(m_mtxOnStatusCB);
|
2017-04-01 16:35:56 +08:00
|
|
|
|
if(m_dqOnStatusCB.size()){
|
|
|
|
|
m_dqOnStatusCB.front()(val);
|
|
|
|
|
m_dqOnStatusCB.pop_front();
|
|
|
|
|
}else{
|
|
|
|
|
auto level = val["level"];
|
|
|
|
|
auto code = val["code"].as_string();
|
|
|
|
|
if(level.type() == AMF_STRING){
|
|
|
|
|
if(level.as_string() != "status"){
|
|
|
|
|
throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) {
|
|
|
|
|
switch (chunkData.typeId) {
|
|
|
|
|
case MSG_CMD:
|
|
|
|
|
case MSG_CMD3: {
|
|
|
|
|
AMFDecoder dec(chunkData.strBuf, 0);
|
|
|
|
|
std::string type = dec.load<std::string>();
|
|
|
|
|
auto it = g_mapCmd.find(type);
|
|
|
|
|
if(it != g_mapCmd.end()){
|
|
|
|
|
auto fun = it->second;
|
|
|
|
|
(this->*fun)(dec);
|
|
|
|
|
}else{
|
|
|
|
|
WarnL << "can not support cmd:" << type;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
//WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} /* namespace Rtmp */
|
|
|
|
|
} /* namespace ZL */
|
|
|
|
|
|