初步实现hls播放器个数计数

This commit is contained in:
xiongziliang 2019-12-28 16:48:11 +08:00
parent 20d56b713f
commit 94806b2cd6
19 changed files with 111 additions and 59 deletions

8
.gitignore vendored
View File

@ -32,6 +32,12 @@
*.DS_Store *.DS_Store
/cmake-build-debug/ /cmake-build-debug/
/cmake-build-release/
/linux/
/.vs/
/.idea/ /.idea/
/c_wrapper/.idea/ /c_wrapper/.idea/
/release/mac/Debug/ /release/
/Android/.idea/
/Android/app/src/main/cpp/libs_export/
/3rdpart/media-server/.idea/

View File

@ -232,6 +232,14 @@ void FFmpegSource::onNoneReader(MediaSource &sender) {
} }
} }
int FFmpegSource::totalReaderCount(MediaSource &sender) {
auto listener = _listener.lock();
if(listener){
return listener->totalReaderCount(sender);
}
return 0;
}
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
_listener = src->getListener(); _listener = src->getListener();
src->setListener(shared_from_this()); src->setListener(shared_from_this());

View File

@ -57,8 +57,10 @@ private:
void startTimer(int timeout_ms); void startTimer(int timeout_ms);
void onGetMediaSource(const MediaSource::Ptr &src); void onGetMediaSource(const MediaSource::Ptr &src);
//MediaSourceEvent override
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override ; void onNoneReader(MediaSource &sender) override ;
int totalReaderCount(MediaSource &sender) override;
private: private:
Process _process; Process _process;
Timer::Ptr _timer; Timer::Ptr _timer;

View File

@ -98,6 +98,13 @@ const std::weak_ptr<MediaSourceEvent>& MediaSource::getListener() const{
return _listener; return _listener;
} }
int MediaSource::totalReaderCount(){
auto listener = _listener.lock();
if(!listener){
return readerCount();
}
return listener->totalReaderCount(*this);
}
bool MediaSource::seekTo(uint32_t ui32Stamp) { bool MediaSource::seekTo(uint32_t ui32Stamp) {
auto listener = _listener.lock(); auto listener = _listener.lock();
if(!listener){ if(!listener){

View File

@ -67,6 +67,9 @@ public:
// 通知无人观看 // 通知无人观看
virtual void onNoneReader(MediaSource &sender); virtual void onNoneReader(MediaSource &sender);
// 观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0;
}; };
class MediaInfo{ class MediaInfo{
@ -124,8 +127,10 @@ public:
void setTrackSource(const std::weak_ptr<TrackSource> &track_src); void setTrackSource(const std::weak_ptr<TrackSource> &track_src);
// 设置监听者 // 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener); virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取观看者个数 // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人
virtual int readerCount() = 0; virtual int readerCount() = 0;
// 观看者个数,包括(hls/rtsp/rtmp)
virtual int totalReaderCount();
// 获取流当前时间戳 // 获取流当前时间戳
virtual uint32_t getTimeStamp(TrackType trackType) = 0; virtual uint32_t getTimeStamp(TrackType trackType) = 0;

View File

@ -30,6 +30,7 @@
#include "Rtsp/RtspMediaSourceMuxer.h" #include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h" #include "Rtmp/RtmpMediaSourceMuxer.h"
#include "Record/Recorder.h" #include "Record/Recorder.h"
#include "Record/HlsManager.h"
class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this<MultiMediaSourceMuxer>{ class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public: public:
@ -64,6 +65,9 @@ public:
Recorder::startRecord(Recorder::type_mp4,vhost, strApp, strId, true, false); Recorder::startRecord(Recorder::type_mp4,vhost, strApp, strId, true, false);
} }
_get_hls_player = [vhost,strApp,strId](){
return HlsManager::Instance().hlsPlayerCount(vhost,strApp,strId);
};
} }
virtual ~MultiMediaSourceMuxer(){} virtual ~MultiMediaSourceMuxer(){}
@ -96,8 +100,8 @@ public:
* *
* @return * @return
*/ */
int readerCount() const{ int totalReaderCount() const{
return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0); return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) + _get_hls_player();
} }
void setTimeStamp(uint32_t stamp){ void setTimeStamp(uint32_t stamp){
@ -157,6 +161,7 @@ private:
RtmpMediaSourceMuxer::Ptr _rtmp; RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp; RtspMediaSourceMuxer::Ptr _rtsp;
Listener *_listener = nullptr; Listener *_listener = nullptr;
function<int()> _get_hls_player;
}; };

View File

@ -76,8 +76,8 @@ bool HttpServerCookie::isExpired() {
return _ticker.elapsedTime() > _max_elapsed * 1000; return _ticker.elapsedTime() > _max_elapsed * 1000;
} }
std::shared_ptr<lock_guard<mutex> > HttpServerCookie::getLock(){ std::shared_ptr<lock_guard<recursive_mutex> > HttpServerCookie::getLock(){
return std::make_shared<lock_guard<mutex> >(_mtx); return std::make_shared<lock_guard<recursive_mutex> >(_mtx);
} }
string HttpServerCookie::cookieExpireTime() const{ string HttpServerCookie::cookieExpireTime() const{

View File

@ -108,9 +108,7 @@ public:
* *
* @return * @return
*/ */
std::shared_ptr<lock_guard<mutex> > getLock(); std::shared_ptr<lock_guard<recursive_mutex> > getLock();
private: private:
string cookieExpireTime() const ; string cookieExpireTime() const ;
private: private:
@ -119,7 +117,7 @@ private:
string _cookie_uuid; string _cookie_uuid;
uint64_t _max_elapsed; uint64_t _max_elapsed;
Ticker _ticker; Ticker _ticker;
mutex _mtx; recursive_mutex _mtx;
std::weak_ptr<HttpCookieManager> _manager; std::weak_ptr<HttpCookieManager> _manager;
}; };

View File

@ -32,6 +32,7 @@
#include "HttpFileManager.h" #include "HttpFileManager.h"
#include "Util/File.h" #include "Util/File.h"
#include "HttpSession.h" #include "HttpSession.h"
#include "Record/HlsManager.h"
namespace mediakit { namespace mediakit {
@ -44,6 +45,7 @@ static const string kCookiePathKey = "kCookiePathKey";
static const string kAccessErrKey = "kAccessErrKey"; static const string kAccessErrKey = "kAccessErrKey";
static const string kAccessHls = "kAccessHls"; static const string kAccessHls = "kAccessHls";
static const string kHlsSuffix = "/hls.m3u8"; static const string kHlsSuffix = "/hls.m3u8";
static const string kHlsData = "kHlsData";
static const string &getContentType(const char *name) { static const string &getContentType(const char *name) {
const char *dot; const char *dot;
@ -284,9 +286,8 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
bool is_hls = end_of(path,kHlsSuffix); bool is_hls = end_of(path,kHlsSuffix);
//该用户从来未获取过cookie这个时候我们广播是否允许该用户访问该http目录 //该用户从来未获取过cookie这个时候我们广播是否允许该用户访问该http目录
HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls]( const string &errMsg, HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, mediaInfo]
const string &cookie_path_in, (const string &errMsg, const string &cookie_path_in, int cookieLifeSecond) {
int cookieLifeSecond) {
HttpServerCookie::Ptr cookie; HttpServerCookie::Ptr cookie;
if (cookieLifeSecond) { if (cookieLifeSecond) {
//本次鉴权设置了有效期我们把鉴权结果缓存在cookie中 //本次鉴权设置了有效期我们把鉴权结果缓存在cookie中
@ -305,8 +306,15 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
(*cookie)[kAccessErrKey].set<string>(errMsg); (*cookie)[kAccessErrKey].set<string>(errMsg);
//记录访问的是否为hls //记录访问的是否为hls
(*cookie)[kAccessHls].set<bool>(is_hls); (*cookie)[kAccessHls].set<bool>(is_hls);
if(is_hls){
//hls相关信息
replace(const_cast<string &>(mediaInfo._streamid),kHlsSuffix,"");
(*cookie)[kHlsData].set<HlsCookieData>(mediaInfo);
}
callback(errMsg, cookie);
}else{
callback(errMsg, nullptr);
} }
callback(errMsg, cookie);
}; };
if (is_hls && emitHlsPlayed(parser, mediaInfo, path, is_dir, accessPathInvoker, sender)) { if (is_hls && emitHlsPlayed(parser, mediaInfo, path, is_dir, accessPathInvoker, sender)) {
@ -369,6 +377,12 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) {
if(cookie){
auto is_hls = (*cookie)[kAccessHls].get<bool>();
if(is_hls){
(*cookie)[kHlsData].get<HlsCookieData>().addByteUsage(body->remainSize());
}
}
cb(codeOut.data(), getContentType(strFile.data()), headerOut, body); cb(codeOut.data(), getContentType(strFile.data()), headerOut, body);
}; };
invoker.responseFile(parser.getValues(), httpHeader, strFile); invoker.responseFile(parser.getValues(), httpHeader, strFile);

View File

@ -174,12 +174,8 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
}, getPoller()); }, getPoller());
} }
int PlayerProxy::readerCount(){
return (_mediaMuxer ? _mediaMuxer->readerCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
}
bool PlayerProxy::close(MediaSource &sender,bool force) { bool PlayerProxy::close(MediaSource &sender,bool force) {
if(!force && readerCount() != 0){ if(!force && totalReaderCount()){
return false; return false;
} }
@ -201,12 +197,20 @@ bool PlayerProxy::close(MediaSource &sender,bool force) {
} }
void PlayerProxy::onNoneReader(MediaSource &sender) { void PlayerProxy::onNoneReader(MediaSource &sender) {
if(!_mediaMuxer || _mediaMuxer->readerCount() != 0){ if(!_mediaMuxer || totalReaderCount()){
return; return;
} }
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
int PlayerProxy::totalReaderCount(){
return (_mediaMuxer ? _mediaMuxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
}
int PlayerProxy::totalReaderCount(MediaSource &sender) {
return totalReaderCount();
}
class MuteAudioMaker : public FrameDispatcher{ class MuteAudioMaker : public FrameDispatcher{
public: public:
typedef std::shared_ptr<MuteAudioMaker> Ptr; typedef std::shared_ptr<MuteAudioMaker> Ptr;

View File

@ -75,18 +75,15 @@ public:
* @param strUrl * @param strUrl
*/ */
void play(const string &strUrl) override; void play(const string &strUrl) override;
/**
*
* @return
*/
bool close(MediaSource &sender,bool force) override;
private: private:
//MediaSourceEvent override
bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override;
int totalReaderCount() ;
void rePlay(const string &strUrl,int iFailedCnt); void rePlay(const string &strUrl,int iFailedCnt);
void onPlaySuccess(); void onPlaySuccess();
int readerCount() ;
private: private:
bool _bEnableRtsp; bool _bEnableRtsp;
bool _bEnableRtmp; bool _bEnableRtmp;

View File

@ -179,7 +179,7 @@ void MP4Reader::startReadMP4() {
return true; return true;
} }
bool MP4Reader::close(MediaSource &sender,bool force){ bool MP4Reader::close(MediaSource &sender,bool force){
if(!_mediaMuxer || (!force && _mediaMuxer->readerCount() != 0)){ if(!_mediaMuxer || (!force && _mediaMuxer->totalReaderCount())){
return false; return false;
} }
_timer.reset(); _timer.reset();
@ -188,18 +188,22 @@ bool MP4Reader::close(MediaSource &sender,bool force){
} }
void MP4Reader::onNoneReader(MediaSource &sender) { void MP4Reader::onNoneReader(MediaSource &sender) {
if(!_mediaMuxer || _mediaMuxer->readerCount() != 0){ if(!_mediaMuxer || _mediaMuxer->totalReaderCount()){
return; return;
} }
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
int MP4Reader::totalReaderCount(MediaSource &sender) {
return _mediaMuxer ? _mediaMuxer->totalReaderCount() : sender.readerCount();
}
bool MP4Reader::readSample(int iTimeInc,bool justSeekSyncFrame) { bool MP4Reader::readSample(int iTimeInc,bool justSeekSyncFrame) {
TimeTicker(); TimeTicker();
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
auto bFlag0 = readVideoSample(iTimeInc,justSeekSyncFrame);//数据没读完 auto bFlag0 = readVideoSample(iTimeInc,justSeekSyncFrame);//数据没读完
auto bFlag1 = readAudioSample(iTimeInc,justSeekSyncFrame);//数据没读完 auto bFlag1 = readAudioSample(iTimeInc,justSeekSyncFrame);//数据没读完
auto bFlag2 = _mediaMuxer->readerCount() > 0;//读取者大于0 auto bFlag2 = _mediaMuxer->totalReaderCount() > 0;//读取者大于0
if((bFlag0 || bFlag1) && bFlag2){ if((bFlag0 || bFlag1) && bFlag2){
_alive.resetTime(); _alive.resetTime();
} }

View File

@ -56,19 +56,6 @@ public:
*/ */
void startReadMP4(); void startReadMP4();
/**
*
* @param ui32Stamp
* @return
*/
bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override;
/**
* MP4Reader的流化进程
* @return
*/
bool close(MediaSource &sender,bool force) override;
/** /**
* MP4Reader对象然后查找相关的MediaSource对象 * MP4Reader对象然后查找相关的MediaSource对象
* @param strSchema * @param strSchema
@ -87,7 +74,11 @@ public:
bool checkApp = true); bool checkApp = true);
private: private:
//MediaSourceEvent override
bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override;
bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override;
#ifdef ENABLE_MP4V2 #ifdef ENABLE_MP4V2
void seek(uint32_t iSeekTime,bool bReStart = true); void seek(uint32_t iSeekTime,bool bReStart = true);
inline void setSeekTime(uint32_t iSeekTime); inline void setSeekTime(uint32_t iSeekTime);

View File

@ -88,11 +88,11 @@ public:
} }
} }
/** /**
* * (hls/rtsp/rtmp)
*/ */
int readerCount() override { int totalReaderCount() override{
return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
} }
/** /**

View File

@ -532,7 +532,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
bool RtmpSession::close(MediaSource &sender,bool force) { bool RtmpSession::close(MediaSource &sender,bool force) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_pPublisherSrc || (!force && _pPublisherSrc->readerCount() != 0)){ if(!_pPublisherSrc || (!force && _pPublisherSrc->totalReaderCount())){
return false; return false;
} }
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
@ -542,12 +542,16 @@ bool RtmpSession::close(MediaSource &sender,bool force) {
void RtmpSession::onNoneReader(MediaSource &sender) { void RtmpSession::onNoneReader(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_pPublisherSrc || _pPublisherSrc->readerCount() != 0){ if(!_pPublisherSrc || _pPublisherSrc->totalReaderCount()){
return; return;
} }
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
int RtmpSession::totalReaderCount(MediaSource &sender) {
return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount();
}
void RtmpSession::setSocketFlags(){ void RtmpSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay); GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) { if(!ultraLowDelay) {

View File

@ -83,8 +83,11 @@ private:
sendResponse(MSG_CMD, invoke.data()); sendResponse(MSG_CMD, invoke.data());
} }
bool close(MediaSource &sender,bool force) override ; //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override;
void setSocketFlags(); void setSocketFlags();
string getStreamId(const string &str); string getStreamId(const string &str);
void dumpMetadata(const AMFValue &metadata); void dumpMetadata(const AMFValue &metadata);

View File

@ -80,10 +80,10 @@ public:
} }
/** /**
* * (hls/rtsp/rtmp)
*/ */
int readerCount() override { int totalReaderCount() override{
return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
} }
/** /**

View File

@ -1136,7 +1136,7 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
bool RtspSession::close(MediaSource &sender,bool force) { bool RtspSession::close(MediaSource &sender,bool force) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_pushSrc || (!force && _pushSrc->readerCount() != 0)){ if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
return false; return false;
} }
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
@ -1147,12 +1147,15 @@ bool RtspSession::close(MediaSource &sender,bool force) {
void RtspSession::onNoneReader(MediaSource &sender){ void RtspSession::onNoneReader(MediaSource &sender){
//此回调在其他线程触发 //此回调在其他线程触发
if(!_pushSrc || _pushSrc->readerCount() != 0){ if(!_pushSrc || _pushSrc->totalReaderCount()){
return; return;
} }
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
}
void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoP(this) <<(int)pkt.Interleaved; //InfoP(this) <<(int)pkt.Interleaved;

View File

@ -108,8 +108,9 @@ protected:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override ; bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override;
//TcpSession override //TcpSession override
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;
/** /**