change srt streamid like srs srt streamid

This commit is contained in:
xiongguangjie 2022-06-11 21:37:41 +08:00
parent 4d6cff36b8
commit f8373302d0
5 changed files with 151 additions and 61 deletions

View File

@ -11,50 +11,53 @@
#ifndef ZLMEDIAKIT_MACROS_H #ifndef ZLMEDIAKIT_MACROS_H
#define ZLMEDIAKIT_MACROS_H #define ZLMEDIAKIT_MACROS_H
#include "Util/logger.h"
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include "Util/logger.h"
#if defined(__MACH__) #if defined(__MACH__)
#include <arpa/inet.h> #include <arpa/inet.h>
#include <machine/endian.h> #include <machine/endian.h>
#define __BYTE_ORDER BYTE_ORDER #define __BYTE_ORDER BYTE_ORDER
#define __BIG_ENDIAN BIG_ENDIAN #define __BIG_ENDIAN BIG_ENDIAN
#define __LITTLE_ENDIAN LITTLE_ENDIAN #define __LITTLE_ENDIAN LITTLE_ENDIAN
#elif defined(__linux__) #elif defined(__linux__)
#include <endian.h> #include <arpa/inet.h>
#include <arpa/inet.h> #include <endian.h>
#elif defined(_WIN32) #elif defined(_WIN32)
#define BIG_ENDIAN 1 #define BIG_ENDIAN 1
#define LITTLE_ENDIAN 0 #define LITTLE_ENDIAN 0
#define BYTE_ORDER LITTLE_ENDIAN #define BYTE_ORDER LITTLE_ENDIAN
#define __BYTE_ORDER BYTE_ORDER #define __BYTE_ORDER BYTE_ORDER
#define __BIG_ENDIAN BIG_ENDIAN #define __BIG_ENDIAN BIG_ENDIAN
#define __LITTLE_ENDIAN LITTLE_ENDIAN #define __LITTLE_ENDIAN LITTLE_ENDIAN
#endif #endif
#ifndef PACKED #ifndef PACKED
#if !defined(_WIN32) #if !defined(_WIN32)
#define PACKED __attribute__((packed)) #define PACKED __attribute__((packed))
#else #else
#define PACKED #define PACKED
#endif //!defined(_WIN32) #endif //! defined(_WIN32)
#endif #endif
#ifndef CHECK #ifndef CHECK
#define CHECK(exp,...) mediakit::Assert_ThrowCpp(!(exp), #exp, __FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__) #define CHECK(exp, ...) mediakit::Assert_ThrowCpp(!(exp), #exp, __FUNCTION__, __FILE__, __LINE__, ##__VA_ARGS__)
#endif//CHECK #endif // CHECK
#ifndef MAX #ifndef MAX
#define MAX(a,b) ((a) > (b) ? (a) : (b) ) #define MAX(a, b) ((a) > (b) ? (a) : (b))
#endif //MAX #endif // MAX
#ifndef MIN #ifndef MIN
#define MIN(a,b) ((a) < (b) ? (a) : (b) ) #define MIN(a, b) ((a) < (b) ? (a) : (b))
#endif //MIN #endif // MIN
#ifndef CLEAR_ARR #ifndef CLEAR_ARR
#define CLEAR_ARR(arr) for(auto &item : arr){ item = 0;} #define CLEAR_ARR(arr) \
#endif //CLEAR_ARR for (auto &item : arr) { \
item = 0; \
}
#endif // CLEAR_ARR
#define VHOST_KEY "vhost" #define VHOST_KEY "vhost"
#define HTTP_SCHEMA "http" #define HTTP_SCHEMA "http"
@ -64,6 +67,7 @@
#define HLS_SCHEMA "hls" #define HLS_SCHEMA "hls"
#define TS_SCHEMA "ts" #define TS_SCHEMA "ts"
#define FMP4_SCHEMA "fmp4" #define FMP4_SCHEMA "fmp4"
#define SRT_SCHEMA "srt"
#define DEFAULT_VHOST "__defaultVhost__" #define DEFAULT_VHOST "__defaultVhost__"
#ifdef __cplusplus #ifdef __cplusplus
@ -78,7 +82,7 @@ namespace mediakit {
extern const char kServerName[]; extern const char kServerName[];
template<typename ...ARGS> template <typename... ARGS>
void Assert_ThrowCpp(int failed, const char *exp, const char *func, const char *file, int line, ARGS &&...args) { void Assert_ThrowCpp(int failed, const char *exp, const char *func, const char *file, int line, ARGS &&...args) {
if (failed) { if (failed) {
std::stringstream ss; std::stringstream ss;
@ -87,5 +91,5 @@ void Assert_ThrowCpp(int failed, const char *exp, const char *func, const char *
} }
} }
}//namespace mediakit } // namespace mediakit
#endif //ZLMEDIAKIT_MACROS_H #endif // ZLMEDIAKIT_MACROS_H

View File

@ -1,10 +1,11 @@
#include <memory> #include "Util/util.h"
#include "Util/util.h" #include <memory>
#include "SrtTransportImp.hpp" #include "SrtTransportImp.hpp"
namespace SRT { namespace SRT {
SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) : SrtTransport(poller) {} SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller)
: SrtTransport(poller) {}
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
@ -12,7 +13,7 @@ SrtTransportImp::~SrtTransportImp() {
WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/" WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/"
<< _media_info._streamid << ")断开,耗时(s):" << duration; << _media_info._streamid << ")断开,耗时(s):" << duration;
//流量统计事件广播 // 流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) { if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent( NoticeCenter::Instance().emitEvent(
@ -28,15 +29,13 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr
} }
_is_pusher = false; _is_pusher = false;
TraceL << " stream id " << streamid; TraceL << " stream id " << streamid;
if (streamid.empty()) { if (!parseStreamid(streamid)) {
onShutdown(SockException(Err_shutdown, "stream id not empty")); onShutdown(SockException(Err_shutdown, "stream id not vaild"));
return; return;
} }
_media_info.parse("srt://" + streamid);
auto params = Parser::parseArgs(_media_info._param_strs); auto params = Parser::parseArgs(_media_info._param_strs);
if (params["type"] == "push") { if (params["m"] == "publish") {
_is_pusher = true; _is_pusher = true;
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this); _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
emitOnPublish(); emitOnPublish();
@ -46,6 +45,56 @@ void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr
} }
} }
//
bool SrtTransportImp::parseStreamid(std::string &streamid) {
if (!toolkit::start_with(streamid, "#!::")) {
return false;
}
_media_info._schema = SRT_SCHEMA;
std::string real_streamid = streamid.substr(4);
std::string vhost, app, stream_name;
auto params = Parser::parseArgs(real_streamid, ",", "=");
for (auto it : params) {
if (it.first == "h") {
vhost = it.second;
} else if (it.first == "r") {
auto tmps = toolkit::split(it.second, "/");
if (tmps.size() < 2) {
continue;
}
app = tmps[0];
stream_name = tmps[1];
} else {
if (_media_info._param_strs.empty()) {
_media_info._param_strs = it.first + "=" + it.second;
} else {
_media_info._param_strs += "&" + it.first + "=" + it.second;
}
}
}
if (app == "" || stream_name == "") {
return false;
}
if (vhost != "") {
_media_info._vhost = vhost;
} else {
_media_info._vhost = DEFAULT_VHOST;
}
_media_info._app = app;
_media_info._streamid = stream_name;
TraceL << " vhost=" << _media_info._vhost << " app=" << _media_info._app << " streamid=" << _media_info._streamid
<< " params=" << _media_info._param_strs;
return true;
}
void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
if (!_is_pusher) { if (!_is_pusher) {
WarnP(this) << "this is a player data ignore"; WarnP(this) << "this is a player data ignore";
@ -66,16 +115,14 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) {
if (!force && totalReaderCount(sender)) { if (!force && totalReaderCount(sender)) {
return false; return false;
} }
std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" std::string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/"
<< sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
<< sender.getApp() << "/"
<< sender.getId() << " " << force;
weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {
strong_self->onShutdown(SockException(Err_shutdown, err)); strong_self->onShutdown(SockException(Err_shutdown, err));
//主动关闭推流,那么不延时注销 // 主动关闭推流,那么不延时注销
strong_self->_muxer = nullptr; strong_self->_muxer = nullptr;
} }
}); });
@ -122,12 +169,12 @@ void SrtTransportImp::emitOnPublish() {
} }
}; };
//触发推流鉴权事件 // 触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent( auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker,
static_cast<SockInfo &>(*this)); static_cast<SockInfo &>(*this));
if (!flag) { if (!flag) {
//该事件无人监听,默认不鉴权 // 该事件无人监听,默认不鉴权
invoker("", ProtocolOption()); invoker("", ProtocolOption());
} }
} }
@ -156,19 +203,19 @@ void SrtTransportImp::emitOnPlay() {
} }
void SrtTransportImp::doPlay() { void SrtTransportImp::doPlay() {
//异步查找直播流 // 异步查找直播流
MediaInfo info = _media_info; MediaInfo info = _media_info;
info._schema = TS_SCHEMA; info._schema = TS_SCHEMA;
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) { MediaSource::findAsync(info, getSession(), [weak_self](const MediaSource::Ptr &src) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 // 本对象已经销毁
TraceL << "本对象已经销毁"; TraceL << "本对象已经销毁";
return; return;
} }
if (!src) { if (!src) {
//未找到该流 // 未找到该流
TraceL << "未找到该流"; TraceL << "未找到该流";
strong_self->onShutdown(SockException(Err_shutdown)); strong_self->onShutdown(SockException(Err_shutdown));
} else { } else {
@ -180,7 +227,7 @@ void SrtTransportImp::doPlay() {
strong_self->_ts_reader->setDetachCB([weak_self]() { strong_self->_ts_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 // 本对象已经销毁
return; return;
} }
strong_self->onShutdown(SockException(Err_shutdown)); strong_self->onShutdown(SockException(Err_shutdown));
@ -188,7 +235,7 @@ void SrtTransportImp::doPlay() {
strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 // 本对象已经销毁
return; return;
} }
size_t i = 0; size_t i = 0;

View File

@ -1,11 +1,11 @@
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#include <mutex> #include "Common/MultiMediaSourceMuxer.h"
#include "Rtp/Decoder.h" #include "Rtp/Decoder.h"
#include "SrtTransport.hpp" #include "SrtTransport.hpp"
#include "TS/TSMediaSource.h" #include "TS/TSMediaSource.h"
#include "Common/MultiMediaSourceMuxer.h" #include <mutex>
namespace SRT { namespace SRT {
@ -66,6 +66,7 @@ protected:
bool inputFrame(const Frame::Ptr &frame) override; bool inputFrame(const Frame::Ptr &frame) override;
private: private:
bool parseStreamid(std::string &streamid);
void emitOnPublish(); void emitOnPublish();
void emitOnPlay(); void emitOnPlay();

View File

@ -9,17 +9,36 @@
## 使用 ## 使用
zlm中的srt更加streamid 来确定是推流还是拉流来确定vhost,app,streamid(ZLM中的) zlm中的srt根据streamid 来确定是推流还是拉流来确定vhost,app,streamid(ZLM中的)、
srt中的streamid 为 `<vhost>/<app>/<streamid>?type=<push|play>& <other>=<other>`
srt中的streamid 为 `#!::key1=value1,key2=value2,key3=value4......`
h,r为特殊的key,来确定vhost,app,streamid,如果没有h则vhost为默认值
m 为特殊key来确定是推流还是拉流如果为publish 则为推流,否则为拉流 ,如果不存在m,则默认为拉流
其他key与m会作为webhook的鉴权参数
如:
#!::h=zlmediakit.com,r=live/test,m=publish
vhost = zlmediakit.com
app = live
streamid = test
是推流
- OBS 推流地址 - OBS 推流地址
`srt://192.168.1.105:9000?streamid=__defaultVhost__/live/test?type=push` `srt://192.168.1.105:9000?streamid=#!::r=live/test,m=publish`
- ffmpeg 推流 - ffmpeg 推流
`ffmpeg -re -stream_loop -1 -i test.ts -c:v copy -c:a copy -f mpegts srt://192.168.1.105:9000?streamid="__defaultVhost__/live/test?type=push"` `ffmpeg -re -stream_loop -1 -i test.ts -c:v copy -c:a copy -f mpegts srt://192.168.1.105:9000?streamid=#!::r=live/test,m=publish`
- ffplay 拉流 - ffplay 拉流
`ffplay -i srt://192.168.1.105:9000?streamid=__defaultVhost__/live/test` `ffplay -i srt://192.168.1.105:9000?streamid=#!::r=live/test`
- vlc 不支持,因为无法指定streamid[参考](https://github.com/Haivision/srt/issues/1015) - vlc 不支持,因为无法指定streamid[参考](https://github.com/Haivision/srt/issues/1015)

View File

@ -9,16 +9,35 @@
## usage ## usage
zlm get vhost,app,streamid and push or play by streamid of srt like this `<vhost>/<app>/<streamid>?type=<push|play>& <other>=<other>` zlm get vhost,app,streamid and push or play by streamid of srt like this
`#!::key1=value1,key2=value2,key3=value4......`
h and r is special key,to get vhost app streamid, if h not exist ,vhost is default value
m is special key, to judge is push or pull, if vaule is publish the mode is pushotherwise is play, if m not exist, mode is play
other key and m ,can use by webhook to auth for play or push
like
#!::h=zlmediakit.com,r=live/test,m=publish
vhost = zlmediakit.com
app = live
streamid = test
mode is push
- OBS push stream url - OBS push stream url
`srt://192.168.1.105:9000?streamid=__defaultVhost__/live/test?type=push` `srt://192.168.1.105:9000?streamid=#!::r=live/test,m=publish`
- ffmpeg push - ffmpeg push
`ffmpeg -re -stream_loop -1 -i test.ts -c:v copy -c:a copy -f mpegts srt://192.168.1.105:9000?streamid="__defaultVhost__/live/test?type=push"` `ffmpeg -re -stream_loop -1 -i test.ts -c:v copy -c:a copy -f mpegts srt://192.168.1.105:9000?streamid=#!::r=live/test,m=publish`
- ffplay pull - ffplay pull
`ffplay -i srt://192.168.1.105:9000?streamid=__defaultVhost__/live/test` `ffplay -i srt://192.168.1.105:9000?streamid=#!::r=live/test`
- vlc not support ,because can't set stream id [reference](https://github.com/Haivision/srt/issues/1015) - vlc not support ,because can't set stream id [reference](https://github.com/Haivision/srt/issues/1015)