Merge branch 'master' into cus_pr

同步代码
This commit is contained in:
custompal 2022-07-15 19:24:44 +08:00
commit a0de7577fe
56 changed files with 3351 additions and 1594 deletions

View File

@ -71,6 +71,7 @@ SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
# 空 {} 中不加空格
SpaceInEmptyBlock: false
Standard: C++11
# Tab 占 4 位
TabWidth: 4
# 不使用 TAB

View File

@ -1,4 +1,4 @@
name: Android CI
name: Android
on: [push, pull_request]
jobs:
build:

View File

@ -1,4 +1,4 @@
name: linux C/C++ CI
name: Linux
on: [push, pull_request]

View File

@ -1,4 +1,4 @@
name: macos C/C++ CI
name: macOS
on: [push, pull_request]

27
.github/workflows/style.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: style check
on: [pull_request]
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
# with all history
fetch-depth: 0
- name: Validate BOM
run: |
ret=0
for i in $(git diff --name-only origin/${GITHUB_BASE_REF}...${GITHUB_SHA}); do
if [ -f ${i} ]; then
case ${i} in
*.c|*.cc|*.cpp|*.h)
if file ${i} | grep -qv BOM; then
echo "Missing BOM in ${i}" && ret=1;
fi
;;
esac
fi
done
exit ${ret}

View File

@ -1,4 +1,4 @@
name: MSVC C/C++ CI
name: Windows
on: [push, pull_request]

@ -1 +1 @@
Subproject commit 136b6b2f28193da218f577423db217aeb0f7aa6a
Subproject commit 61f2c6c8d4288c2c60299a84473d9cfec113891c

View File

@ -22,6 +22,10 @@ endif()
set(CMAKE_CXX_STANDARD 11)
#
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
set(DEP_ROOT_DIR ${CMAKE_SOURCE_DIR}/3rdpart/external-${CMAKE_SYSTEM_NAME})
if(NOT EXISTS ${DEP_ROOT_DIR})
file(MAKE_DIRECTORY ${DEP_ROOT_DIR})
endif()
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
@ -108,6 +112,7 @@ option(ENABLE_MSVC_MT "Enable MSVC Mt/Mtd lib" true)
option(ENABLE_API_STATIC_LIB "Enable mk_api static lib" false)
option(USE_SOLUTION_FOLDERS "Enable solution dir supported" ON)
option(ENABLE_SRT "Enable SRT" true)
option(ENABLE_JEMALLOC_STATIC "Enable static linking to the jemalloc library" false)
# ----------------------------------------------------------------------------
# Solution folders:
# ----------------------------------------------------------------------------
@ -299,6 +304,13 @@ if (ENABLE_FFMPEG)
endif ()
if(ENABLE_JEMALLOC_STATIC)
include(cmake/Jemalloc.cmake)
include_directories(${DEP_ROOT_DIR}/${JEMALLOC_NAME}/include/jemalloc)
link_directories(${DEP_ROOT_DIR}/${JEMALLOC_NAME}/lib)
set(JEMALLOC_ROOT_DIR "${DEP_ROOT_DIR}/${JEMALLOC_NAME}")
endif ()
#jemalloc
find_package(JEMALLOC QUIET)
if (JEMALLOC_FOUND)

View File

@ -1,13 +1,19 @@
![logo](https://raw.githubusercontent.com/xia-chu/ZLMediaKit/master/www/logo.png)
![logo](https://raw.githubusercontent.com/ZLMediaKit/ZLMediaKit/master/www/logo.png)
# 一个基于C++11的高性能运营级流媒体服务框架
[![license](http://img.shields.io/badge/license-MIT-green.svg)](https://github.com/xia-chu/ZLMediaKit/blob/master/LICENSE)
[![C++](https://img.shields.io/badge/language-c++-red.svg)](https://en.cppreference.com/)
[![platform](https://img.shields.io/badge/platform-linux%20|%20macos%20|%20windows-blue.svg)](https://github.com/xia-chu/ZLMediaKit)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-yellow.svg)](https://github.com/xia-chu/ZLMediaKit/pulls)
[![Build Status](https://travis-ci.org/xia-chu/ZLMediaKit.svg?branch=master)](https://travis-ci.org/xia-chu/ZLMediaKit)
[![Docker](https://img.shields.io/docker/pulls/zlmediakit/zlmediakit)](https://hub.docker.com/r/zlmediakit/zlmediakit/tags)
[![](https://img.shields.io/badge/license-MIT-green.svg)](https://github.com/ZLMediaKit/ZLMediaKit/blob/master/LICENSE)
[![](https://img.shields.io/badge/language-c++-red.svg)](https://en.cppreference.com/)
[![](https://img.shields.io/badge/platform-linux%20|%20macos%20|%20windows-blue.svg)](https://github.com/ZLMediaKit/ZLMediaKit)
[![](https://img.shields.io/badge/PRs-welcome-yellow.svg)](https://github.com/ZLMediaKit/ZLMediaKit/pulls)
[![](https://github.com/ZLMediaKit/ZLMediaKit/actions/workflows/android.yml/badge.svg)](https://github.com/ZLMediaKit/ZLMediaKit)
[![](https://github.com/ZLMediaKit/ZLMediaKit/actions/workflows/linux.yml/badge.svg)](https://github.com/ZLMediaKit/ZLMediaKit)
[![](https://github.com/ZLMediaKit/ZLMediaKit/actions/workflows/macos.yml/badge.svg)](https://github.com/ZLMediaKit/ZLMediaKit)
[![](https://github.com/ZLMediaKit/ZLMediaKit/actions/workflows/windows.yml/badge.svg)](https://github.com/ZLMediaKit/ZLMediaKit)
[![](https://github.com/ZLMediaKit/ZLMediaKit/actions/workflows/docker.yml/badge.svg)](https://hub.docker.com/r/zlmediakit/zlmediakit/tags)
[![](https://img.shields.io/docker/pulls/zlmediakit/zlmediakit)](https://hub.docker.com/r/zlmediakit/zlmediakit/tags)
## 项目特点
@ -16,10 +22,10 @@
- 使用多路复用/多线程/异步网络IO模式开发并发性能优越支持海量客户端连接。
- 代码经过长期大量的稳定性、性能测试,已经在线上商用验证已久。
- 支持linux、macos、ios、android、windows全平台。
- 支持画面秒开、极低延时([500毫秒内最低可达100毫秒](https://github.com/xia-chu/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95))。
- 提供完善的标准[C API](https://github.com/xia-chu/ZLMediaKit/tree/master/api/include),可以作SDK用或供其他语言调用。
- 提供完整的[MediaServer](https://github.com/xia-chu/ZLMediaKit/tree/master/server)服务器,可以免开发直接部署为商用服务器。
- 提供完善的[restful api](https://github.com/xia-chu/ZLMediaKit/wiki/MediaServer%E6%94%AF%E6%8C%81%E7%9A%84HTTP-API)以及[web hook](https://github.com/xia-chu/ZLMediaKit/wiki/MediaServer%E6%94%AF%E6%8C%81%E7%9A%84HTTP-HOOK-API),支持丰富的业务逻辑。
- 支持画面秒开、极低延时([500毫秒内最低可达100毫秒](https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95))。
- 提供完善的标准[C API](https://github.com/ZLMediaKit/ZLMediaKit/tree/master/api/include),可以作SDK用或供其他语言调用。
- 提供完整的[MediaServer](https://github.com/ZLMediaKit/ZLMediaKit/tree/master/server)服务器,可以免开发直接部署为商用服务器。
- 提供完善的[restful api](https://github.com/ZLMediaKit/ZLMediaKit/wiki/MediaServer%E6%94%AF%E6%8C%81%E7%9A%84HTTP-API)以及[web hook](https://github.com/ZLMediaKit/ZLMediaKit/wiki/MediaServer%E6%94%AF%E6%8C%81%E7%9A%84HTTP-HOOK-API),支持丰富的业务逻辑。
- 打通了视频监控协议栈与直播协议栈对RTSP/RTMP支持都很完善。
- 全面支持H265/H264/AAC/G711/OPUS。
- 功能完善,支持集群、按需转协议、按需推拉流、先播后推、断连续推等功能。
@ -58,7 +64,7 @@
- 支持websocket-flv直播
- 支持H264/H265/AAC/G711/OPUS编码其他编码能转发但不能转协议
- 支持[RTMP-H265](https://github.com/ksvc/FFmpeg/wiki)
- 支持[RTMP-OPUS](https://github.com/xia-chu/ZLMediaKit/wiki/RTMP%E5%AF%B9H265%E5%92%8COPUS%E7%9A%84%E6%94%AF%E6%8C%81)
- 支持[RTMP-OPUS](https://github.com/ZLMediaKit/ZLMediaKit/wiki/RTMP%E5%AF%B9H265%E5%92%8COPUS%E7%9A%84%E6%94%AF%E6%8C%81)
- HLS
- 支持HLS文件生成自带HTTP文件服务器
@ -126,15 +132,15 @@
## 编译以及测试
**编译前务必仔细参考wiki:[快速开始](https://github.com/xia-chu/ZLMediaKit/wiki/%E5%BF%AB%E9%80%9F%E5%BC%80%E5%A7%8B)操作!!!**
**编译前务必仔细参考wiki:[快速开始](https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E5%BF%AB%E9%80%9F%E5%BC%80%E5%A7%8B)操作!!!**
## 怎么使用
你有三种方法使用ZLMediaKit分别是
- 1、使用c api作为sdk使用请参考[这里](https://github.com/xia-chu/ZLMediaKit/tree/master/api/include).
- 2、作为独立的流媒体服务器使用不想做c/c++开发的,可以参考 [restful api](https://github.com/xia-chu/ZLMediaKit/wiki/MediaServer支持的HTTP-API) 和 [web hook](https://github.com/xia-chu/ZLMediaKit/wiki/MediaServer支持的HTTP-HOOK-API ).
- 3、如果想做c/c++开发,添加业务逻辑增加功能,可以参考这里的[测试程序](https://github.com/xia-chu/ZLMediaKit/tree/master/tests).
- 1、使用c api作为sdk使用请参考[这里](https://github.com/ZLMediaKit/ZLMediaKit/tree/master/api/include).
- 2、作为独立的流媒体服务器使用不想做c/c++开发的,可以参考 [restful api](https://github.com/ZLMediaKit/ZLMediaKit/wiki/MediaServer支持的HTTP-API) 和 [web hook](https://github.com/ZLMediaKit/ZLMediaKit/wiki/MediaServer支持的HTTP-HOOK-API ).
- 3、如果想做c/c++开发,添加业务逻辑增加功能,可以参考这里的[测试程序](https://github.com/ZLMediaKit/ZLMediaKit/tree/master/tests).
## Docker 镜像
@ -167,6 +173,7 @@ bash build_docker_images.sh
- [Go实现的海康ehome服务器](https://github.com/tsingeye/FreeEhome)
- 客户端
- [c sdk完整c#包装库](https://github.com/malegend/ZLMediaKit.Autogen)
- [基于C SDK实现的推流客户端](https://github.com/hctym1995/ZLM_ApiDemo)
- [C#版本的Http API与Hook](https://github.com/chengxiaosheng/ZLMediaKit.HttpApi)
- [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk)
@ -195,7 +202,7 @@ bash build_docker_images.sh
- 1、仔细看下readme、wiki如果有必要可以查看下issue.
- 2、如果您的问题还没解决可以提issue.
- 3、有些问题如果不具备参考性的无需在issue提的可以在qq群提.
- 4、QQ私聊一般不接受无偿技术咨询和支持([为什么不提倡QQ私聊](https://github.com/xia-chu/ZLMediaKit/wiki/%E4%B8%BA%E4%BB%80%E4%B9%88%E4%B8%8D%E5%BB%BA%E8%AE%AEQQ%E7%A7%81%E8%81%8A%E5%92%A8%E8%AF%A2%E9%97%AE%E9%A2%98%EF%BC%9F)).
- 4、QQ私聊一般不接受无偿技术咨询和支持([为什么不提倡QQ私聊](https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E4%B8%BA%E4%BB%80%E4%B9%88%E4%B8%8D%E5%BB%BA%E8%AE%AEQQ%E7%A7%81%E8%81%8A%E5%92%A8%E8%AF%A2%E9%97%AE%E9%A2%98%EF%BC%9F)).
## 特别感谢
@ -278,4 +285,4 @@ bash build_docker_images.sh
本项目已经得到不少公司和个人开发者的认可,据作者不完全统计,
使用本项目的公司包括知名的互联网巨头、国内排名前列的云服务公司、多家知名的AI独角兽公司
以及一系列中小型公司。使用者可以通过在 [issue](https://github.com/xia-chu/ZLMediaKit/issues/511) 上粘贴公司的大名和相关项目介绍为本项目背书,感谢支持!
以及一系列中小型公司。使用者可以通过在 [issue](https://github.com/ZLMediaKit/ZLMediaKit/issues/511) 上粘贴公司的大名和相关项目介绍为本项目背书,感谢支持!

View File

@ -187,7 +187,6 @@ API_EXPORT int API_CALL mk_media_init_video(mk_media ctx, int codec_id, int widt
info.iWidth = width;
info.iHeight = height;
info.iBitRate = bit_rate;
(*obj)->getChannel()->initVideo(info);
return (*obj)->getChannel()->initVideo(info);
}

View File

@ -10,12 +10,13 @@
#include "mk_track.h"
#include "Extension/Track.h"
#include "Extension/Factory.h"
using namespace std;
using namespace toolkit;
using namespace mediakit;
class VideoTrackForC : public VideoTrack {
class VideoTrackForC : public VideoTrack, public std::enable_shared_from_this<VideoTrackForC> {
public:
VideoTrackForC(int codec_id, codec_args *args) {
_codec_id = (CodecId) codec_id;
@ -49,7 +50,8 @@ public:
}
Track::Ptr clone() override {
return std::make_shared<std::remove_reference<decltype(*this)>::type>(*this);
auto track_in = std::shared_ptr<Track>(shared_from_this());
return Factory::getTrackByAbstractTrack(track_in);
}
Sdp::Ptr getSdp() override {
@ -61,7 +63,7 @@ private:
codec_args _args;
};
class AudioTrackForC : public AudioTrackImp {
class AudioTrackForC : public AudioTrackImp, public std::enable_shared_from_this<AudioTrackForC> {
public:
~AudioTrackForC() override = default;
@ -69,7 +71,8 @@ public:
AudioTrackImp((CodecId) codec_id, args->audio.sample_rate, args->audio.channels, 16) {}
Track::Ptr clone() override {
return std::make_shared<std::remove_reference<decltype(*this)>::type>(*this);
auto track_in = std::shared_ptr<Track>(shared_from_this());
return Factory::getTrackByAbstractTrack(track_in);
}
Sdp::Ptr getSdp() override {

View File

@ -64,8 +64,12 @@ int main(int argc, char *argv[]) {
mk_media media = mk_media_create("__defaultVhost__", "live", "test", 0, 0, 0);
//h264的codec
mk_media_init_video(media, 0, 0, 0, 0, 2 * 104 * 1024);
//mk_media_init_video(media, 0, 0, 0, 0, 2 * 104 * 1024);
codec_args v_args={0};
mk_track v_track = mk_track_create(MKCodecH264,&v_args);
mk_media_init_track(media,v_track);
mk_media_init_complete(media);
mk_track_unref(v_track);
//创建h264分帧器
mk_h264_splitter splitter = mk_h264_splitter_create(on_h264_frame, media);

View File

@ -1,16 +1,49 @@
# Tries to find Jemalloc headers and libraries.
#
# Usage of this module as follows:
#
# find_package(jemalloc)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# JEMALLOC_ROOT_DIR Set this variable to the root installation of
# Jemalloc if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# JEMALLOC_FOUND System has Jemalloc libs/headers
# JEMALLOC_LIBRARIES The Jemalloc libraries
# JEMALLOC_INCLUDE_DIR The location of Jemalloc headers
if (ENABLE_JEMALLOC_STATIC)
find_path(JEMALLOC_INCLUDE_DIR
NAMES jemalloc.h
HINTS ${JEMALLOC_ROOT_DIR}/include/jemalloc
NO_DEFAULT_PATH)
find_library(JEMALLOC_LIBRARIES
NAMES jemalloc
HINTS ${JEMALLOC_ROOT_DIR}/lib
NO_DEFAULT_PATH)
else ()
find_path(JEMALLOC_INCLUDE_DIR
NAMES jemalloc/jemalloc.h
)
find_library(JEMALLOC_LIBRARY
find_library(JEMALLOC_LIBRARIES
NAMES jemalloc
)
set(JEMALLOC_INCLUDE_DIRS ${JEMALLOC_INCLUDE_DIR})
set(JEMALLOC_LIBRARIES ${JEMALLOC_LIBRARY})
endif ()
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(JEMALLOC DEFAULT_MSG
JEMALLOC_LIBRARIES
JEMALLOC_INCLUDE_DIR)
find_package_handle_standard_args(JEMALLOC DEFAULT_MSG JEMALLOC_LIBRARY JEMALLOC_INCLUDE_DIR)
mark_as_advanced(
JEMALLOC_ROOT_DIR
JEMALLOC_LIBRARIES
JEMALLOC_INCLUDE_DIR)

50
cmake/Jemalloc.cmake Normal file
View File

@ -0,0 +1,50 @@
# Download and build Jemalloc
set(JEMALLOC_VERSION 5.2.1)
set(JEMALLOC_NAME jemalloc-${JEMALLOC_VERSION})
set(JEMALLOC_TAR_PATH ${DEP_ROOT_DIR}/${JEMALLOC_NAME}.tar.bz2)
list(APPEND jemalloc_CONFIG_ARGS --disable-initial-exec-tls)
list(APPEND jemalloc_CONFIG_ARGS --without-export)
list(APPEND jemalloc_CONFIG_ARGS --disable-stats)
list(APPEND jemalloc_CONFIG_ARGS --disable-libdl)
#list(APPEND jemalloc_CONFIG_ARGS --disable-cxx)
#list(APPEND jemalloc_CONFIG_ARGS --with-jemalloc-prefix=je_)
#list(APPEND jemalloc_CONFIG_ARGS --enable-debug)
if(NOT EXISTS ${JEMALLOC_TAR_PATH})
message(STATUS "Downloading ${JEMALLOC_NAME}...")
file(DOWNLOAD https://github.com/jemalloc/jemalloc/releases/download/${JEMALLOC_VERSION}/${JEMALLOC_NAME}.tar.bz2
${JEMALLOC_TAR_PATH})
endif()
SET( DIR_CONTAINING_JEMALLOC ${DEP_ROOT_DIR}/${JEMALLOC_NAME} )
if(NOT EXISTS ${DIR_CONTAINING_JEMALLOC})
message(STATUS "Extracting jemalloc...")
execute_process(COMMAND ${CMAKE_COMMAND} -E tar xzf ${JEMALLOC_TAR_PATH} WORKING_DIRECTORY ${DEP_ROOT_DIR})
endif()
if(NOT EXISTS ${DIR_CONTAINING_JEMALLOC}/Makefile)
message("Configuring jemalloc locally...")
# Builds with "--with-jemalloc-prefix=je_" on OSX
# SET( BASH_COMMAND_TO_RUN bash -l -c "cd ${DIR_CONTAINING_JEMALLOC} && ./configure ${jemalloc_CONFIG_ARGS}" )
#
# EXECUTE_PROCESS( COMMAND ${BASH_COMMAND_TO_RUN}
# WORKING_DIRECTORY ${DIR_CONTAINING_JEMALLOC} RESULT_VARIABLE JEMALLOC_CONFIGURE )
execute_process(COMMAND ./configure ${jemalloc_CONFIG_ARGS} WORKING_DIRECTORY ${DIR_CONTAINING_JEMALLOC} RESULT_VARIABLE JEMALLOC_CONFIGURE)
if(NOT JEMALLOC_CONFIGURE EQUAL 0)
message(FATAL_ERROR "${JEMALLOC_NAME} configure failed!")
message("${JEMALLOC_CONFIGURE}")
endif()
endif()
if(NOT EXISTS ${DIR_CONTAINING_JEMALLOC}/lib/libjemalloc.a)
message("Building jemalloc locally...")
execute_process(COMMAND make "build_lib_static" WORKING_DIRECTORY ${DIR_CONTAINING_JEMALLOC})
if(NOT EXISTS ${DIR_CONTAINING_JEMALLOC}/lib/libjemalloc.a)
message(FATAL_ERROR "${JEMALLOC_NAME} build failed!")
endif()
endif()

View File

@ -298,6 +298,7 @@ g711a_pt=8
#rtc播放推流、播放超时时间
timeoutSec=15
#本机对rtc客户端的可见ip作为服务器时一般为公网ip可有多个用','分开当置空时会自动获取网卡ip
#同时支持环境变量,以$开头,如"$EXTERN_IP"; 请参考https://github.com/ZLMediaKit/ZLMediaKit/pull/1786
externIP=
#rtc udp服务器监听端口号所有rtc客户端将通过该端口传输stun/dtls/srtp/srtcp数据
#该端口是多线程的,同时支持客户端网络切换导致的连接迁移

View File

@ -47,13 +47,13 @@ static void on_ffmpeg_log(void *ctx, int level, const char *fmt, va_list args) {
}
LogLevel lev;
switch (level) {
case AV_LOG_FATAL: lev = LError; break;
case AV_LOG_FATAL:
case AV_LOG_ERROR: lev = LError; break;
case AV_LOG_WARNING: lev = LWarn; break;
case AV_LOG_INFO: lev = LInfo; break;
case AV_LOG_VERBOSE: lev = LDebug; break;
case AV_LOG_VERBOSE:
case AV_LOG_DEBUG: lev = LDebug; break;
case AV_LOG_TRACE: lev = LTrace; break;
case AV_LOG_TRACE:
default: lev = LTrace; break;
}
LoggerWrapper::printLogV(::toolkit::getLogger(), lev, __FILE__, ctx ? av_default_item_name(ctx) : "NULL", level, fmt, args);
@ -63,7 +63,9 @@ static bool setupFFmpeg_l() {
av_log_set_level(AV_LOG_TRACE);
av_log_set_flags(AV_LOG_PRINT_LEVEL);
av_log_set_callback(on_ffmpeg_log);
#if (LIBAVCODEC_VERSION_MAJOR < 58)
avcodec_register_all();
#endif
return true;
}
@ -243,14 +245,14 @@ AVFrame *FFmpegFrame::get() const {
void FFmpegFrame::fillPicture(AVPixelFormat target_format, int target_width, int target_height) {
assert(_data == nullptr);
_data = new char[avpicture_get_size(target_format, target_width, target_height)];
avpicture_fill((AVPicture *) _frame.get(), (uint8_t *) _data, target_format, target_width, target_height);
_data = new char[av_image_get_buffer_size(target_format, target_width, target_height, 1)];
av_image_fill_arrays(_frame->data, _frame->linesize, (uint8_t *) _data, target_format, target_width, target_height,1);
}
///////////////////////////////////////////////////////////////////////////
template<bool decoder = true>
static inline AVCodec *getCodec_l(const char *name) {
static inline const AVCodec *getCodec_l(const char *name) {
auto codec = decoder ? avcodec_find_decoder_by_name(name) : avcodec_find_encoder_by_name(name);
if (codec) {
InfoL << (decoder ? "got decoder:" : "got encoder:") << name;
@ -261,7 +263,7 @@ static inline AVCodec *getCodec_l(const char *name) {
}
template<bool decoder = true>
static inline AVCodec *getCodec_l(enum AVCodecID id) {
static inline const AVCodec *getCodec_l(enum AVCodecID id) {
auto codec = decoder ? avcodec_find_decoder(id) : avcodec_find_encoder(id);
if (codec) {
InfoL << (decoder ? "got decoder:" : "got encoder:") << avcodec_get_name(id);
@ -277,7 +279,7 @@ public:
CodecName(enum AVCodecID id) : _id(id) {}
template <bool decoder>
AVCodec *getCodec() const {
const AVCodec *getCodec() const {
if (!_codec_name.empty()) {
return getCodec_l<decoder>(_codec_name.data());
}
@ -290,8 +292,8 @@ private:
};
template <bool decoder = true>
static inline AVCodec *getCodec(const std::initializer_list<CodecName> &codec_list) {
AVCodec *ret = nullptr;
static inline const AVCodec *getCodec(const std::initializer_list<CodecName> &codec_list) {
const AVCodec *ret = nullptr;
for (int i = codec_list.size(); i >= 1; --i) {
ret = codec_list.begin()[i - 1].getCodec<decoder>();
if (ret) {
@ -303,8 +305,8 @@ static inline AVCodec *getCodec(const std::initializer_list<CodecName> &codec_li
FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track, int thread_num) {
setupFFmpeg();
AVCodec *codec = nullptr;
AVCodec *codec_default = nullptr;
const AVCodec *codec = nullptr;
const AVCodec *codec_default = nullptr;
switch (track->getCodecId()) {
case CodecH264:
codec_default = getCodec({AV_CODEC_ID_H264});
@ -358,7 +360,9 @@ FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track, int thread_num) {
}
//保存AVFrame的引用
#ifdef FF_API_OLD_ENCDEC
_context->refcounted_frames = 1;
#endif
_context->flags |= AV_CODEC_FLAG_LOW_DELAY;
_context->flags2 |= AV_CODEC_FLAG2_FAST;
if (track->getTrackType() == TrackVideo) {
@ -539,7 +543,7 @@ FFmpegSwr::~FFmpegSwr() {
FFmpegFrame::Ptr FFmpegSwr::inputFrame(const FFmpegFrame::Ptr &frame) {
if (frame->get()->format == _target_format &&
frame->get()->channels == _target_channels &&
frame->get()->channel_layout == _target_channel_layout &&
frame->get()->channel_layout == (uint64_t)_target_channel_layout &&
frame->get()->sample_rate == _target_samplerate) {
//不转格式
return frame;
@ -596,7 +600,8 @@ int FFmpegSws::inputFrame(const FFmpegFrame::Ptr &frame, uint8_t *data) {
}
AVFrame dst;
memset(&dst, 0, sizeof(dst));
avpicture_fill((AVPicture *) &dst, data, _target_format, _target_width, _target_height);
av_image_fill_arrays(dst.data, dst.linesize, data, _target_format, _target_width, _target_height,1);
if (!_ctx) {
_ctx = sws_getContext(frame->get()->width, frame->get()->height, (enum AVPixelFormat) frame->get()->format,
_target_width, _target_height, _target_format, SWS_FAST_BILINEAR, NULL, NULL, NULL);

View File

@ -25,6 +25,7 @@ extern "C" {
#include "libavcodec/avcodec.h"
#include "libswresample/swresample.h"
#include "libavutil/audio_fifo.h"
#include "libavutil/imgutils.h"
#ifdef __cplusplus
}
#endif

View File

@ -354,64 +354,12 @@ void MultiMediaSourceMuxer::resetTracks() {
}
}
//该类实现frame级别的时间戳覆盖
class FrameModifyStamp : public Frame{
public:
typedef std::shared_ptr<FrameModifyStamp> Ptr;
FrameModifyStamp(const Frame::Ptr &frame, Stamp &stamp){
_frame = frame;
//覆盖时间戳
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, true);
}
~FrameModifyStamp() override {}
uint32_t dts() const override{
return (uint32_t)_dts;
}
uint32_t pts() const override{
return (uint32_t)_pts;
}
size_t prefixSize() const override {
return _frame->prefixSize();
}
bool keyFrame() const override {
return _frame->keyFrame();
}
bool configFrame() const override {
return _frame->configFrame();
}
bool cacheAble() const override {
return _frame->cacheAble();
}
char *data() const override {
return _frame->data();
}
size_t size() const override {
return _frame->size();
}
CodecId getCodecId() const override {
return _frame->getCodecId();
}
private:
int64_t _dts;
int64_t _pts;
Frame::Ptr _frame;
};
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
auto frame = frame_in;
if (modify_stamp) {
//开启了时间戳覆盖
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
frame = std::make_shared<FrameStamp>(frame, _stamp[frame->getTrackType()],true);
}
bool ret = false;

View File

@ -274,6 +274,9 @@ bool AACTrack::inputFrame(const Frame::Ptr &frame) {
bool ret = false;
//有adts头尝试分帧
int64_t dts = frame->dts();
int64_t pts = frame->pts();
auto ptr = frame->data();
auto end = frame->data() + frame->size();
while (ptr < end) {
@ -284,7 +287,7 @@ bool AACTrack::inputFrame(const Frame::Ptr &frame) {
if (frame_len == frame->size()) {
return inputFrame_l(frame);
}
auto sub_frame = std::make_shared<FrameInternal<FrameFromPtr> >(frame, (char *) ptr, frame_len, ADTS_HEADER_LEN);
auto sub_frame = std::make_shared<FrameTSInternal<FrameFromPtr> >(frame, (char *) ptr, frame_len, ADTS_HEADER_LEN,dts,pts);
ptr += frame_len;
if (ptr > end) {
WarnL << "invalid aac length in adts header: " << frame_len
@ -295,6 +298,8 @@ bool AACTrack::inputFrame(const Frame::Ptr &frame) {
if (inputFrame_l(sub_frame)) {
ret = true;
}
dts += 1024*1000/getAudioSampleRate();
pts += 1024*1000/getAudioSampleRate();
}
return ret;
}

View File

@ -76,7 +76,7 @@ bool CommonRtpEncoder::inputFrame(const Frame::Ptr &frame){
auto len = frame->size() - frame->prefixSize();
auto remain_size = len;
auto max_size = getMaxSize();
bool is_key = frame->keyFrame();
bool mark = false;
while (remain_size > 0) {
size_t rtp_size;
@ -86,9 +86,10 @@ bool CommonRtpEncoder::inputFrame(const Frame::Ptr &frame){
rtp_size = remain_size;
mark = true;
}
RtpCodec::inputRtp(makeRtp(getTrackType(), ptr, rtp_size, mark, stamp), false);
RtpCodec::inputRtp(makeRtp(getTrackType(), ptr, rtp_size, mark, stamp), is_key);
ptr += rtp_size;
remain_size -= rtp_size;
is_key = false;
}
return len > 0;
}

View File

@ -95,6 +95,32 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
}
}
Track::Ptr Factory::getTrackByAbstractTrack(const Track::Ptr& track) {
auto codec = track->getCodecId();
switch (codec) {
case CodecG711A:
case CodecG711U: {
auto audio_track = dynamic_pointer_cast<AudioTrackImp>(track);
return std::make_shared<G711Track>(codec, audio_track->getAudioSampleRate(), audio_track->getAudioChannel(), 16);
}
case CodecL16: {
auto audio_track = dynamic_pointer_cast<AudioTrackImp>(track);
return std::make_shared<L16Track>(audio_track->getAudioSampleRate(), audio_track->getAudioChannel());
}
case CodecAAC : return std::make_shared<AACTrack>();
case CodecOpus : return std::make_shared<OpusTrack>();
case CodecH265 : return std::make_shared<H265Track>();
case CodecH264 : return std::make_shared<H264Track>();
default: {
//其他codec不支持
WarnL << "暂不支持该该编码类型创建Track:" << track->getCodecName();
return nullptr;
}
}
}
RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
GET_CONFIG(uint32_t,audio_mtu,Rtp::kAudioMtuSize);
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize);

View File

@ -27,6 +27,11 @@ public:
*/
static Track::Ptr getTrackBySdp(const SdpTrack::Ptr &track);
/**
* c api Track生成具体Track对象
*/
static Track::Ptr getTrackByAbstractTrack(const Track::Ptr& track);
/**
* sdp生成rtp编码器
* @param sdp sdp对象

View File

@ -15,6 +15,7 @@
#include <functional>
#include "Util/RingBuffer.h"
#include "Network/Socket.h"
#include "Common/Stamp.h"
namespace mediakit{
@ -262,6 +263,27 @@ private:
Frame::Ptr _parent_frame;
};
/**
* Frame类中可以有多个帧(AAC)
* ZLMediaKit会先把这种复合帧split成单个帧然后再处理
* Frame
*
*/
template<typename Parent>
class FrameTSInternal : public Parent{
public:
typedef std::shared_ptr<FrameTSInternal> Ptr;
FrameTSInternal(const Frame::Ptr &parent_frame, char *ptr, size_t size, size_t prefix_size,uint32_t dts,uint32_t pts)
: Parent(ptr, size, dts, pts, prefix_size) {
_parent_frame = parent_frame;
}
bool cacheAble() const override {
return _parent_frame->cacheAble();
}
private:
Frame::Ptr _parent_frame;
};
/**
*
*/
@ -369,17 +391,18 @@ class FrameFromPtr : public Frame{
public:
typedef std::shared_ptr<FrameFromPtr> Ptr;
FrameFromPtr(CodecId codec_id, char *ptr, size_t size, uint32_t dts, uint32_t pts = 0, size_t prefix_size = 0)
: FrameFromPtr(ptr, size, dts, pts, prefix_size) {
FrameFromPtr(CodecId codec_id, char *ptr, size_t size, uint32_t dts, uint32_t pts = 0, size_t prefix_size = 0,bool is_key = false )
: FrameFromPtr(ptr, size, dts, pts, prefix_size,is_key) {
_codec_id = codec_id;
}
FrameFromPtr(char *ptr, size_t size, uint32_t dts, uint32_t pts = 0, size_t prefix_size = 0){
FrameFromPtr(char *ptr, size_t size, uint32_t dts, uint32_t pts = 0, size_t prefix_size = 0,bool is_key = false){
_ptr = ptr;
_size = size;
_dts = dts;
_pts = pts;
_prefix_size = prefix_size;
_is_key = is_key;
}
char *data() const override{
@ -418,7 +441,7 @@ public:
}
bool keyFrame() const override {
return false;
return _is_key;
}
bool configFrame() const override{
@ -435,6 +458,7 @@ protected:
size_t _size;
size_t _prefix_size;
CodecId _codec_id = CodecInvalid;
bool _is_key;
};
/**
@ -498,6 +522,58 @@ private:
FrameImp::Ptr _buffer;
};
//该类实现frame级别的时间戳覆盖
class FrameStamp : public Frame{
public:
typedef std::shared_ptr<FrameStamp> Ptr;
FrameStamp(const Frame::Ptr &frame, Stamp &stamp,bool modify_stamp){
_frame = frame;
//覆盖时间戳
stamp.revise(frame->dts(), frame->pts(), _dts, _pts, modify_stamp);
}
~FrameStamp() override {}
uint32_t dts() const override{
return (uint32_t)_dts;
}
uint32_t pts() const override{
return (uint32_t)_pts;
}
size_t prefixSize() const override {
return _frame->prefixSize();
}
bool keyFrame() const override {
return _frame->keyFrame();
}
bool configFrame() const override {
return _frame->configFrame();
}
bool cacheAble() const override {
return _frame->cacheAble();
}
char *data() const override {
return _frame->data();
}
size_t size() const override {
return _frame->size();
}
CodecId getCodecId() const override {
return _frame->getCodecId();
}
private:
int64_t _dts;
int64_t _pts;
Frame::Ptr _frame;
};
/**
* Buffer对象转换成可缓存的Frame对象
*/

View File

@ -8,11 +8,11 @@
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <stddef.h>
#include <assert.h>
#include "Rtcp.h"
#include "Util/logger.h"
#include "RtcpFCI.h"
#include "Util/logger.h"
#include <assert.h>
#include <stddef.h>
using namespace std;
using namespace toolkit;
@ -21,37 +21,49 @@ namespace mediakit {
const char *rtcpTypeToStr(RtcpType type) {
switch (type) {
#define SWITCH_CASE(key, value) case RtcpType::key : return #value "(" #key ")";
#define SWITCH_CASE(key, value) \
case RtcpType::key: \
return #value "(" #key ")";
RTCP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return "unknown rtcp pt";
default:
return "unknown rtcp pt";
}
}
const char *sdesTypeToStr(SdesType type) {
switch (type) {
#define SWITCH_CASE(key, value) case SdesType::key : return #value "(" #key ")";
#define SWITCH_CASE(key, value) \
case SdesType::key: \
return #value "(" #key ")";
SDES_TYPE_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return "unknown source description type";
default:
return "unknown source description type";
}
}
const char *psfbTypeToStr(PSFBType type) {
switch (type) {
#define SWITCH_CASE(key, value) case PSFBType::key : return #value "(" #key ")";
#define SWITCH_CASE(key, value) \
case PSFBType::key: \
return #value "(" #key ")";
PSFB_TYPE_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return "unknown payload-specific fb message fmt type";
default:
return "unknown payload-specific fb message fmt type";
}
}
const char *rtpfbTypeToStr(RTPFBType type) {
switch (type) {
#define SWITCH_CASE(key, value) case RTPFBType::key : return #value "(" #key ")";
#define SWITCH_CASE(key, value) \
case RTPFBType::key: \
return #value "(" #key ")";
RTPFB_TYPE_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return "unknown transport layer feedback messages fmt type";
default:
return "unknown transport layer feedback messages fmt type";
}
}
@ -140,7 +152,8 @@ string RtcpHeader::dumpString() const {
return rtcp->dumpString();
}
default: return StrPrinter << dumpHeader() << hexdump((char *)this + sizeof(*this), getSize() - sizeof(*this));
default:
return StrPrinter << dumpHeader() << hexdump((char *)this + sizeof(*this), getSize() - sizeof(*this));
}
}
@ -193,8 +206,22 @@ void RtcpHeader::net2Host(size_t len) {
bye->net2Host(len);
break;
}
default: throw std::runtime_error(StrPrinter << "未处理的rtcp包:" << rtcpTypeToStr((RtcpType) this->pt));
case RtcpType::RTCP_XR: {
RtcpXRRRTR *xr = (RtcpXRRRTR *)this;
if (xr->bt == 4) {
xr->net2Host(len);
// TraceL<<xr->dumpString();
} else if (xr->bt == 5) {
RtcpXRDLRR *dlrr = (RtcpXRDLRR *)this;
dlrr->net2Host(len);
TraceL << dlrr->dumpString();
} else {
throw std::runtime_error(StrPrinter << "rtcp xr bt " << xr->bt << " not support");
}
break;
}
default:
throw std::runtime_error(StrPrinter << "未处理的rtcp包:" << rtcpTypeToStr((RtcpType)this->pt));
}
}
@ -224,19 +251,13 @@ vector<RtcpHeader *> RtcpHeader::loadFromBytes(char *data, size_t len) {
class BufferRtcp : public Buffer {
public:
BufferRtcp(std::shared_ptr<RtcpHeader> rtcp) {
_rtcp = std::move(rtcp);
}
BufferRtcp(std::shared_ptr<RtcpHeader> rtcp) { _rtcp = std::move(rtcp); }
~BufferRtcp() override {}
char *data() const override {
return (char *) _rtcp.get();
}
char *data() const override { return (char *)_rtcp.get(); }
size_t size() const override {
return _rtcp->getSize();
}
size_t size() const override { return _rtcp->getSize(); }
private:
std::shared_ptr<RtcpHeader> _rtcp;
@ -254,9 +275,7 @@ std::shared_ptr<RtcpSR> RtcpSR::create(size_t item_count) {
auto ptr = (RtcpSR *)new char[bytes];
setupHeader(ptr, RtcpType::RTCP_SR, item_count, bytes);
setupPadding(ptr, bytes - real_size);
return std::shared_ptr<RtcpSR>(ptr, [](RtcpSR *ptr) {
delete[] (char *) ptr;
});
return std::shared_ptr<RtcpSR>(ptr, [](RtcpSR *ptr) { delete[] (char *)ptr; });
}
string RtcpSR::getNtpStamp() const {
@ -311,13 +330,15 @@ string RtcpSR::dumpString() const {
#define CHECK_MIN_SIZE(size, kMinSize) \
if (size < kMinSize) { \
throw std::out_of_range(StrPrinter << rtcpTypeToStr((RtcpType)pt) << " 长度不足:" << size << " < " << kMinSize); \
throw std::out_of_range( \
StrPrinter << rtcpTypeToStr((RtcpType)pt) << " 长度不足:" << size << " < " << kMinSize); \
}
#define CHECK_REPORT_COUNT(item_count) \
/*修正个数防止getItemList时内存越界*/ \
if (report_count != item_count) { \
WarnL << rtcpTypeToStr((RtcpType)pt) << " report_count 字段不正确,已修正为:" << (int)report_count << " -> " << item_count; \
WarnL << rtcpTypeToStr((RtcpType)pt) << " report_count 字段不正确,已修正为:" << (int)report_count << " -> " \
<< item_count; \
report_count = item_count; \
}
@ -385,9 +406,7 @@ std::shared_ptr<RtcpRR> RtcpRR::create(size_t item_count) {
auto ptr = (RtcpRR *)new char[bytes];
setupHeader(ptr, RtcpType::RTCP_RR, item_count, bytes);
setupPadding(ptr, bytes - real_size);
return std::shared_ptr<RtcpRR>(ptr, [](RtcpRR *ptr) {
delete[] (char *) ptr;
});
return std::shared_ptr<RtcpRR>(ptr, [](RtcpRR *ptr) { delete[] (char *)ptr; });
}
string RtcpRR::dumpString() const {
@ -473,9 +492,7 @@ std::shared_ptr<RtcpSdes> RtcpSdes::create(const std::vector<string> &item_text)
setupHeader(ptr, RtcpType::RTCP_SDES, item_text.size(), bytes);
setupPadding(ptr, bytes - real_size);
return std::shared_ptr<RtcpSdes>(ptr, [](RtcpSdes *ptr) {
delete[] (char *) ptr;
});
return std::shared_ptr<RtcpSdes>(ptr, [](RtcpSdes *ptr) { delete[] (char *)ptr; });
}
string RtcpSdes::dumpString() const {
@ -527,9 +544,7 @@ std::shared_ptr<RtcpFB> RtcpFB::create_l(RtcpType type, int fmt, const void *fci
}
setupHeader(ptr, type, fmt, bytes);
setupPadding(ptr, bytes - real_size);
return std::shared_ptr<RtcpFB>((RtcpFB *) ptr, [](RtcpFB *ptr) {
delete[] (char *) ptr;
});
return std::shared_ptr<RtcpFB>((RtcpFB *)ptr, [](RtcpFB *ptr) { delete[] (char *)ptr; });
}
std::shared_ptr<RtcpFB> RtcpFB::create(PSFBType fmt, const void *fci, size_t fci_len) {
@ -606,7 +621,9 @@ string RtcpFB::dumpString() const {
}
break;
}
default: /*不可达*/ assert(0); break;
default: /*不可达*/
assert(0);
break;
}
return std::move(printer);
}
@ -639,9 +656,7 @@ std::shared_ptr<RtcpBye> RtcpBye::create(const std::vector<uint32_t> &ssrcs, con
memcpy(reason_len_ptr + 1, reason.data(), *reason_len_ptr);
}
return std::shared_ptr<RtcpBye>(ptr, [](RtcpBye *ptr) {
delete[] (char *) ptr;
});
return std::shared_ptr<RtcpBye>(ptr, [](RtcpBye *ptr) { delete[] (char *)ptr; });
}
vector<uint32_t *> RtcpBye::getSSRC() {
@ -691,6 +706,96 @@ void RtcpBye::net2Host(size_t size) {
}
}
}
////////////////////////////////////////////
string RtcpXRRRTR::dumpString() const {
_StrPrinter printer;
printer << RtcpHeader::dumpHeader();
printer << "ssrc :" << ssrc << "\r\n";
printer << "bt :" << (int)bt << "\r\n";
printer << "block_length : " << block_length << "\r\n";
printer << "ntp msw : " << ntpmsw << "\r\n";
printer << "ntp lsw : " << ntplsw << "\r\n";
return std::move(printer);
}
void RtcpXRRRTR::net2Host(size_t size) {
static const size_t kMinSize = sizeof(RtcpHeader);
CHECK_MIN_SIZE(size, kMinSize);
if (size != sizeof(RtcpXRRRTR)) {
throw std::invalid_argument(
StrPrinter << "rtcp xr Receiver Reference Time Report Block must is " << sizeof(RtcpXRRRTR)
<< " actual size " << size);
}
ssrc = ntohl(ssrc);
block_length = ntohs(block_length);
ntpmsw = ntohl(ntpmsw);
ntplsw = ntohl(ntplsw);
}
string RtcpXRDLRRReportItem::dumpString() const {
_StrPrinter printer;
printer << "ssrc :" << ssrc << "\r\n";
printer << "last RR (lrr) :" << lrr << "\r\n";
printer << "delay since last RR (dlrr): " << dlrr << "\r\n";
return std::move(printer);
}
void RtcpXRDLRRReportItem::net2Host() {
ssrc = ntohl(ssrc);
lrr = ntohl(lrr);
dlrr = ntohl(dlrr);
}
std::vector<RtcpXRDLRRReportItem *> RtcpXRDLRR::getItemList() {
auto count = block_length / 3;
RtcpXRDLRRReportItem *ptr = &items;
vector<RtcpXRDLRRReportItem *> ret;
for (int i = 0; i < (int)count; ++i) {
ret.emplace_back(ptr);
++ptr;
}
return ret;
}
string RtcpXRDLRR::dumpString() const {
_StrPrinter printer;
printer << RtcpHeader::dumpHeader();
printer << "ssrc :" << ssrc << "\r\n";
printer << "bt :" << (int)bt << "\r\n";
printer << "block_length : " << block_length << "\r\n";
auto items_list = ((RtcpXRDLRR *)this)->getItemList();
auto i = 0;
for (auto &item : items_list) {
printer << "---- item:" << i++ << " ----\r\n";
printer << item->dumpString();
}
return std::move(printer);
}
void RtcpXRDLRR::net2Host(size_t size) {
static const size_t kMinSize = sizeof(RtcpHeader);
CHECK_MIN_SIZE(size, kMinSize);
ssrc = ntohl(ssrc);
block_length = ntohs(block_length);
auto count = block_length / 3;
for (int i = 0; i < (int)count; ++i) {
RtcpXRDLRRReportItem *ptr = &items;
ptr->net2Host();
ptr++;
}
}
std::shared_ptr<RtcpXRDLRR> RtcpXRDLRR::create(size_t item_count) {
auto real_size = sizeof(RtcpXRDLRR) - sizeof(RtcpXRDLRRReportItem) + item_count * sizeof(RtcpXRDLRRReportItem);
auto bytes = alignSize(real_size);
auto ptr = (RtcpXRDLRR *)new char[bytes];
setupHeader(ptr, RtcpType::RTCP_XR, 0, bytes);
setupPadding(ptr, bytes - real_size);
return std::shared_ptr<RtcpXRDLRR>(ptr, [](RtcpXRDLRR *ptr) { delete[] (char *)ptr; });
}
#if 0
#include "Util/onceToken.h"

View File

@ -11,11 +11,11 @@
#ifndef ZLMEDIAKIT_RTCP_H
#define ZLMEDIAKIT_RTCP_H
#include "Common/macros.h"
#include "Network/Buffer.h"
#include "Util/util.h"
#include <stdint.h>
#include <vector>
#include "Util/util.h"
#include "Network/Buffer.h"
#include "Common/macros.h"
namespace mediakit {
@ -222,7 +222,6 @@ public:
void setSize(size_t size);
protected:
/**
*
* 使net2Host转换成主机字节序后才可使用此函数
@ -687,6 +686,136 @@ private:
void net2Host(size_t size);
} PACKED;
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|reserved | PT=XR=207 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
: report blocks :
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| BT=4 | reserved | block length = 2 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, most significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, least significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
class RtcpXRRRTR : public RtcpHeader {
public:
friend class RtcpHeader;
uint32_t ssrc;
// 4
uint8_t bt;
uint8_t reserved;
// 2
uint16_t block_length;
// ntp timestamp MSW(in second)
uint32_t ntpmsw;
// ntp timestamp LSW(in picosecond)
uint32_t ntplsw;
private:
/**
*
* 使net2Host转换成主机字节序后才可使用此函数
*/
std::string dumpString() const;
/**
*
* @param size
*/
void net2Host(size_t size);
} PACKED;
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| BT=5 | reserved | block length |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| SSRC_1 (SSRC of first receiver) | sub-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ block
| last RR (LRR) | 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| delay since last RR (DLRR) |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| SSRC_2 (SSRC of second receiver) | sub-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ block
: ... : 2
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*/
class RtcpXRDLRRReportItem {
public:
friend class RtcpXRDLRR;
uint32_t ssrc;
uint32_t lrr;
uint32_t dlrr;
private:
/**
*
* 使net2Host转换成主机字节序后才可使用此函数
*/
std::string dumpString() const;
/**
*
* @param size
*/
void net2Host();
} PACKED;
class RtcpXRDLRR : public RtcpHeader {
public:
friend class RtcpHeader;
uint32_t ssrc;
uint8_t bt;
uint8_t reserved;
uint16_t block_length;
RtcpXRDLRRReportItem items;
/**
* RtcpXRDLRR包RtcpHeader部分()
* @param item_count RtcpXRDLRRReportItem对象个数
* @return RtcpXRDLRR包
*/
static std::shared_ptr<RtcpXRDLRR> create(size_t item_count);
/**
* RtcpXRDLRRReportItem对象指针列表
* 使net2Host转换成主机字节序后才可使用此函数
*/
std::vector<RtcpXRDLRRReportItem *> getItemList();
private:
/**
*
* 使net2Host转换成主机字节序后才可使用此函数
*/
std::string dumpString() const;
/**
*
* @param size
*/
void net2Host(size_t size);
} PACKED;
#if defined(_WIN32)
#pragma pack(pop)
#endif // defined(_WIN32)

View File

@ -14,7 +14,8 @@ using namespace toolkit;
namespace mediakit {
void RtcpContext::onRtp(uint16_t /*seq*/, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t /*sample_rate*/, size_t bytes) {
void RtcpContext::onRtp(
uint16_t /*seq*/, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t /*sample_rate*/, size_t bytes) {
++_packets;
_bytes += bytes;
_last_rtp_stamp = stamp;
@ -45,6 +46,10 @@ Buffer::Ptr RtcpContext::createRtcpRR(uint32_t rtcp_ssrc, uint32_t rtp_ssrc) {
throw std::runtime_error("没有实现, rtp发送者尝试发送rr包");
}
Buffer::Ptr RtcpContext::createRtcpXRDLRR(uint32_t rtcp_ssrc, uint32_t rtp_ssrc) {
throw std::runtime_error("没有实现, rtp发送者尝试发送xr dlrr包");
}
////////////////////////////////////////////////////////////////////////////////////
void RtcpContextForSend::onRtcp(RtcpHeader *rtcp) {
@ -72,7 +77,21 @@ void RtcpContextForSend::onRtcp(RtcpHeader *rtcp) {
}
break;
}
default: break;
case RtcpType::RTCP_XR: {
auto rtcp_xr = (RtcpXRRRTR *)rtcp;
if (rtcp_xr->bt == 4) {
_xr_xrrtr_recv_last_rr[rtcp_xr->ssrc]
= ((rtcp_xr->ntpmsw & 0xFFFF) << 16) | ((rtcp_xr->ntplsw >> 16) & 0xFFFF);
_xr_rrtr_recv_sys_stamp[rtcp_xr->ssrc] = getCurrentMillisecond();
} else if (rtcp_xr->bt == 5) {
TraceL << "for sender not recive dlrr";
} else {
TraceL << "not support xr bt " << rtcp_xr->bt;
}
break;
}
default:
break;
}
}
@ -103,15 +122,45 @@ Buffer::Ptr RtcpContextForSend::createRtcpSR(uint32_t rtcp_ssrc) {
return RtcpHeader::toBuffer(std::move(rtcp));
}
toolkit::Buffer::Ptr RtcpContextForSend::createRtcpXRDLRR(uint32_t rtcp_ssrc, uint32_t rtp_ssrc) {
auto rtcp = RtcpXRDLRR::create(1);
rtcp->bt = 5;
rtcp->reserved = 0;
rtcp->block_length = htons(3);
rtcp->ssrc = htonl(rtcp_ssrc);
rtcp->items.ssrc = htonl(rtp_ssrc);
if (_xr_xrrtr_recv_last_rr.find(rtp_ssrc) == _xr_xrrtr_recv_last_rr.end()) {
rtcp->items.lrr = 0;
WarnL;
} else {
rtcp->items.lrr = htonl(_xr_xrrtr_recv_last_rr[rtp_ssrc]);
}
if (_xr_rrtr_recv_sys_stamp.find(rtp_ssrc) == _xr_rrtr_recv_sys_stamp.end()) {
rtcp->items.dlrr = 0;
WarnL;
} else {
// now - Last SR time,单位毫秒
auto delay = getCurrentMillisecond() - _xr_rrtr_recv_sys_stamp[rtp_ssrc];
// in units of 1/65536 seconds
auto dlsr = (uint32_t)(delay / 1000.0f * 65536);
rtcp->items.dlrr = htonl(dlsr);
}
return RtcpHeader::toBuffer(std::move(rtcp));
}
////////////////////////////////////////////////////////////////////////////////////
void RtcpContextForRecv::onRtp(uint16_t seq, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t sample_rate, size_t bytes) {
void RtcpContextForRecv::onRtp(
uint16_t seq, uint32_t stamp, uint64_t ntp_stamp_ms, uint32_t sample_rate, size_t bytes) {
{
// 接收者才做复杂的统计运算
auto sys_stamp = getCurrentMillisecond();
if (_last_rtp_sys_stamp) {
// 计算时间戳抖动值
double diff = double((int64_t(sys_stamp) - int64_t(_last_rtp_sys_stamp)) * (sample_rate / double(1000.0))
double diff = double(
(int64_t(sys_stamp) - int64_t(_last_rtp_sys_stamp)) * (sample_rate / double(1000.0))
- (int64_t(stamp) - int64_t(_last_rtp_stamp)));
if (diff < 0) {
diff = -diff;
@ -162,7 +211,8 @@ void RtcpContextForRecv::onRtcp(RtcpHeader *rtcp) {
_last_sr_ntp_sys = getCurrentMillisecond();
break;
}
default: break;
default:
break;
}
}

View File

@ -11,9 +11,9 @@
#ifndef ZLMEDIAKIT_RTCPCONTEXT_H
#define ZLMEDIAKIT_RTCPCONTEXT_H
#include <stdint.h>
#include <stddef.h>
#include "Rtcp.h"
#include <stddef.h>
#include <stdint.h>
namespace mediakit {
@ -55,6 +55,13 @@ public:
*/
virtual toolkit::Buffer::Ptr createRtcpSR(uint32_t rtcp_ssrc);
/**
* @brief xr的dlrr包rtt
*
* @return toolkit::Buffer::Ptr
*/
virtual toolkit::Buffer::Ptr createRtcpXRDLRR(uint32_t rtcp_ssrc, uint32_t rtp_ssrc);
/**
* RR rtcp包
* @param rtcp_ssrc rtcp的ssrc
@ -86,8 +93,11 @@ protected:
class RtcpContextForSend : public RtcpContext {
public:
toolkit::Buffer::Ptr createRtcpSR(uint32_t rtcp_ssrc) override;
void onRtcp(RtcpHeader *rtcp) override;
toolkit::Buffer::Ptr createRtcpXRDLRR(uint32_t rtcp_ssrc, uint32_t rtp_ssrc) override;
/**
* rtt
* @param ssrc rtp ssrc
@ -98,6 +108,9 @@ public:
private:
std::map<uint32_t /*ssrc*/, uint32_t /*rtt*/> _rtt;
std::map<uint32_t /*last_sr_lsr*/, uint64_t /*ntp stamp*/> _sender_report_ntp;
std::map<uint32_t /*ssrc*/, uint64_t /*xr rrtr sys stamp*/> _xr_rrtr_recv_sys_stamp;
std::map<uint32_t /*ssrc*/, uint32_t /*last rr */> _xr_xrrtr_recv_last_rr;
};
class RtcpContextForRecv : public RtcpContext {

View File

@ -465,9 +465,11 @@ FCI_TWCC::TwccPacketStatus FCI_TWCC::getPacketChunkList(size_t total_size) const
string FCI_TWCC::dumpString(size_t total_size) const {
_StrPrinter printer;
auto map = getPacketChunkList(total_size);
printer << "twcc fci, base_seq:" << getBaseSeq() << ", pkt_status_count:" << getPacketCount() << ", ref time:" << getReferenceTime() << ", fb count:" << (int)fb_pkt_count << "\n";
printer << "twcc fci, base_seq:" << getBaseSeq() << ", pkt_status_count:" << getPacketCount()
<< ", ref time:" << getReferenceTime() << ", fb count:" << (int)fb_pkt_count << "\n";
for (auto &pr : map) {
printer << "rtp seq:" << pr.first <<", packet status:" << (int)(pr.second.first) << ", delta:" << pr.second.second << "\n";
printer << "rtp seq:" << pr.first << ", packet status:" << (int)(pr.second.first)
<< ", delta:" << pr.second.second << "\n";
}
return std::move(printer);
}
@ -476,10 +478,14 @@ static void appendDeltaString(string &delta_str, FCI_TWCC::TwccPacketStatus &sta
for (auto it = status.begin(); it != status.end() && count--;) {
switch (it->second.first) {
// large delta模式先写高字节再写低字节
case SymbolStatus::large_delta: delta_str.push_back((it->second.second >> 8) & 0xFF);
case SymbolStatus::large_delta:
delta_str.push_back((it->second.second >> 8) & 0xFF);
// small delta模式只写低字节
case SymbolStatus::small_delta: delta_str.push_back(it->second.second & 0xFF); break;
default: break;
case SymbolStatus::small_delta:
delta_str.push_back(it->second.second & 0xFF);
break;
default:
break;
}
// 移除已经处理过的数据
it = status.erase(it);

View File

@ -11,8 +11,8 @@
#ifndef ZLMEDIAKIT_RTCPFCI_H
#define ZLMEDIAKIT_RTCPFCI_H
#include "Rtcp.h"
#include "Common/config.h"
#include "Rtcp.h"
namespace mediakit {
@ -354,7 +354,8 @@ enum class SymbolStatus : uint8_t{
class FCI_TWCC {
public:
static size_t constexpr kSize = 8;
using TwccPacketStatus = std::map<uint16_t/*rtp ext seq*/, std::pair<SymbolStatus, int16_t/*recv delta,单位为250us*/> >;
using TwccPacketStatus
= std::map<uint16_t /*rtp ext seq*/, std::pair<SymbolStatus, int16_t /*recv delta,单位为250us*/>>;
void check(size_t size);
std::string dumpString(size_t total_size) const;
uint16_t getBaseSeq() const;

View File

@ -99,7 +99,11 @@ private:
if (_rtmp_src) {
_rtmp_src->setMetaData(val);
}
if(_demuxer){
return;
}
_demuxer = std::make_shared<RtmpDemuxer>();
//TraceL<<" _wait_track_ready "<<_wait_track_ready;
_demuxer->setTrackListener(this, _wait_track_ready);
_demuxer->loadMetaData(val);
}

View File

@ -28,7 +28,7 @@ RtmpSession::~RtmpSession() {
}
void RtmpSession::onError(const SockException& err) {
bool is_player = !_push_src;
bool is_player = !_push_src_ownership;
uint64_t duration = _ticker.createdTime() / 1000;
WarnP(this) << (is_player ? "RTMP播放器(" : "RTMP推流器(")
<< _media_info._vhost << "/"
@ -219,10 +219,11 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
}
void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
_push_src = nullptr;
//此时回复可能触发broken pipe事件从而直接触发onError回调所以需要先把_push_src置空防止触发断流续推功能
sendStatus({ "level", "status",
"code", "NetStream.Unpublish.Success",
"description", "Stop publishing." });
//_push_src = nullptr;
throw std::runtime_error(StrPrinter << "Stop publishing" << endl);
}

View File

@ -24,6 +24,15 @@
using namespace toolkit;
namespace mediakit {
void Decoder::setOnDecode(Decoder::onDecode cb) {
_on_decode = std::move(cb);
}
void Decoder::setOnStream(Decoder::onStream cb) {
_on_stream = std::move(cb);
}
static Decoder::Ptr createDecoder_l(DecoderImp::Type type) {
switch (type){
case DecoderImp::decoder_ps:

View File

@ -25,12 +25,16 @@ public:
typedef std::function<void(int stream, int codecid, const void *extra, size_t bytes, int finish)> onStream;
virtual ssize_t input(const uint8_t *data, size_t bytes) = 0;
virtual void setOnDecode(onDecode cb) = 0;
virtual void setOnStream(onStream cb) = 0;
void setOnDecode(onDecode cb);
void setOnStream(onStream cb);
protected:
Decoder() = default;
virtual ~Decoder() = default;
protected:
onDecode _on_decode;
onStream _on_stream;
};
class DecoderImp{

View File

@ -53,14 +53,6 @@ ssize_t PSDecoder::input(const uint8_t *data, size_t bytes) {
return bytes;
}
void PSDecoder::setOnDecode(Decoder::onDecode cb) {
_on_decode = std::move(cb);
}
void PSDecoder::setOnStream(Decoder::onStream cb) {
_on_stream = std::move(cb);
}
const char *PSDecoder::onSearchPacketTail(const char *data, size_t len) {
try {
auto ret = ps_demuxer_input(static_cast<struct ps_demuxer_t *>(_ps_demuxer), reinterpret_cast<const uint8_t *>(data), len);

View File

@ -25,8 +25,6 @@ public:
~PSDecoder();
ssize_t input(const uint8_t* data, size_t bytes) override;
void setOnDecode(onDecode cb) override;
void setOnStream(onStream cb) override;
// HttpRequestSplitter interface
private:
@ -36,8 +34,6 @@ private:
private:
void *_ps_demuxer = nullptr;
onDecode _on_decode;
onStream _on_stream;
};
}//namespace mediakit

View File

@ -23,7 +23,7 @@ PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) : MpegMuxer(true
_rtp_encoder = std::make_shared<CommonRtpEncoder>(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0);
_rtp_encoder->setRtpRing(std::make_shared<RtpRing::RingType>());
_rtp_encoder->getRtpRing()->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key){
onRTP(std::move(rtp));
onRTP(std::move(rtp),is_key);
}));
InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc());
}
@ -36,7 +36,7 @@ void PSEncoderImp::onWrite(std::shared_ptr<Buffer> buffer, uint32_t stamp, bool
if (!buffer) {
return;
}
_rtp_encoder->inputFrame(std::make_shared<FrameFromPtr>(buffer->data(), buffer->size(), stamp, stamp));
_rtp_encoder->inputFrame(std::make_shared<FrameFromPtr>(buffer->data(), buffer->size(), stamp, stamp,0,key_pos));
}
}//namespace mediakit

View File

@ -27,7 +27,7 @@ public:
protected:
//rtp打包后回调
virtual void onRTP(toolkit::Buffer::Ptr rtp) = 0;
virtual void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) = 0;
protected:
void onWrite(std::shared_ptr<toolkit::Buffer> buffer, uint32_t stamp, bool key_pos) override;

View File

@ -35,7 +35,7 @@ bool RawEncoderImp::addTrack(const Track::Ptr &track){
_rtp_encoder = createRtpEncoder(track);
_rtp_encoder->setRtpRing(std::make_shared<RtpRing::RingType>());
_rtp_encoder->getRtpRing()->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key){
onRTP(std::move(rtp));
onRTP(std::move(rtp),true);
}));
return true;
}
@ -44,7 +44,7 @@ bool RawEncoderImp::addTrack(const Track::Ptr &track){
_rtp_encoder = createRtpEncoder(track);
_rtp_encoder->setRtpRing(std::make_shared<RtpRing::RingType>());
_rtp_encoder->getRtpRing()->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key){
onRTP(std::move(rtp));
onRTP(std::move(rtp),is_key);
}));
return true;
}

View File

@ -43,10 +43,9 @@ public:
protected:
//rtp打包后回调
virtual void onRTP(toolkit::Buffer::Ptr rtp) = 0;
virtual void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) = 0;
private:
RtpCodec::Ptr createRtpEncoder(const Track::Ptr &track);
uint32_t _ssrc;
uint8_t _payload_type;
bool _sendAudio;

View File

@ -19,25 +19,37 @@ namespace mediakit{
RtpCache::RtpCache(onFlushed cb) {
_cb = std::move(cb);
}
bool RtpCache::firstKeyReady(bool in) {
if(_first_key){
return _first_key;
}
_first_key = in;
return _first_key;
}
void RtpCache::onFlush(std::shared_ptr<List<Buffer::Ptr> > rtp_list, bool) {
_cb(std::move(rtp_list));
}
void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer) {
inputPacket(stamp, true, std::move(buffer), false);
void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer,bool is_key ) {
inputPacket(stamp, true, std::move(buffer), is_key);
}
void RtpCachePS::onRTP(Buffer::Ptr buffer) {
void RtpCachePS::onRTP(Buffer::Ptr buffer,bool is_key) {
if(!firstKeyReady(is_key)){
return;
}
auto rtp = std::static_pointer_cast<RtpPacket>(buffer);
auto stamp = rtp->getStampMS();
input(stamp, std::move(buffer));
input(stamp, std::move(buffer),is_key);
}
void RtpCacheRaw::onRTP(Buffer::Ptr buffer) {
void RtpCacheRaw::onRTP(Buffer::Ptr buffer,bool is_key) {
if(!firstKeyReady(is_key)){
return;
}
auto rtp = std::static_pointer_cast<RtpPacket>(buffer);
auto stamp = rtp->getStampMS();
input(stamp, std::move(buffer));
input(stamp, std::move(buffer),is_key);
}
}//namespace mediakit

View File

@ -30,13 +30,15 @@ protected:
* rtp()
* @param buffer rtp数据
*/
void input(uint64_t stamp, toolkit::Buffer::Ptr buffer);
void input(uint64_t stamp, toolkit::Buffer::Ptr buffer,bool is_key = false);
bool firstKeyReady(bool in);
protected:
void onFlush(std::shared_ptr<toolkit::List<toolkit::Buffer::Ptr> > rtp_list, bool) override;
private:
onFlushed _cb;
bool _first_key = false;
};
class RtpCachePS : public RtpCache, public PSEncoderImp{
@ -45,7 +47,7 @@ public:
~RtpCachePS() override = default;
protected:
void onRTP(toolkit::Buffer::Ptr rtp) override;
void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override;
};
@ -55,7 +57,7 @@ public:
~RtpCacheRaw() override = default;
protected:
void onRTP(toolkit::Buffer::Ptr rtp) override;
void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override;
};
}//namespace mediakit

View File

@ -101,14 +101,6 @@ ssize_t TSDecoder::input(const uint8_t *data, size_t bytes) {
return bytes;
}
void TSDecoder::setOnDecode(Decoder::onDecode cb) {
_on_decode = std::move(cb);
}
void TSDecoder::setOnStream(Decoder::onStream cb) {
_on_stream = std::move(cb);
}
#endif//defined(ENABLE_HLS)
}//namespace mediakit

View File

@ -45,14 +45,10 @@ public:
TSDecoder();
~TSDecoder();
ssize_t input(const uint8_t* data, size_t bytes) override ;
void setOnDecode(onDecode cb) override;
void setOnStream(onStream cb) override;
private:
TSSegment _ts_segment;
struct ts_demuxer_t* _demuxer_ctx = nullptr;
onDecode _on_decode;
onStream _on_stream;
};
#endif//defined(ENABLE_HLS)

View File

@ -60,7 +60,7 @@ RtspSession::~RtspSession() {
}
void RtspSession::onError(const SockException &err) {
bool is_player = !_push_src;
bool is_player = !_push_src_ownership;
uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (is_player ? "RTSP播放器(" : "RTSP推流器(")
<< _media_info._vhost << "/"
@ -867,8 +867,9 @@ void RtspSession::handleReq_Pause(const Parser &parser) {
}
void RtspSession::handleReq_Teardown(const Parser &parser) {
sendRtspResponse("200 OK");
_push_src = nullptr;
//此时回复可能触发broken pipe事件从而直接触发onError回调所以需要先把_push_src置空防止触发断流续推功能
sendRtspResponse("200 OK");
throw SockException(Err_shutdown,"recv teardown request");
}

View File

@ -1,6 +1,6 @@
#include "Util/MD5.h"
#include <atomic>
#include "Util/MD5.h"
#include "Util/logger.h"
#include <atomic>
#include "Packet.hpp"
@ -225,7 +225,35 @@ size_t ControlPacket::size() const {
uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len) {
return loadUint32(buf + 12);
}
std::string HandshakePacket::dump(){
_StrPrinter printer;
printer <<"flag:"<< (int)f<<"\r\n";
printer <<"control_type:"<< (int)control_type<<"\r\n";
printer <<"sub_type:"<< (int)sub_type<<"\r\n";
printer <<"type_specific_info:"<< (int)type_specific_info[0]<<":"<<(int)type_specific_info[1]<<":"<<(int)type_specific_info[2]<<":"<<(int)type_specific_info[3]<<"\r\n";
printer <<"timestamp:"<< timestamp<<"\r\n";
printer <<"dst_socket_id:"<< dst_socket_id<<"\r\n";
printer <<"version:"<< version<<"\r\n";
printer <<"encryption_field:"<< encryption_field<<"\r\n";
printer <<"extension_field:"<< extension_field<<"\r\n";
printer <<"initial_packet_sequence_number:"<< initial_packet_sequence_number<<"\r\n";
printer <<"mtu:"<< mtu<<"\r\n";
printer <<"max_flow_window_size:"<< max_flow_window_size<<"\r\n";
printer <<"handshake_type:"<< handshake_type<<"\r\n";
printer <<"srt_socket_id:"<< srt_socket_id<<"\r\n";
printer <<"syn_cookie:"<< syn_cookie<<"\r\n";
printer <<"peer_ip_addr:";
for(size_t i=0;i<sizeof(peer_ip_addr);++i){
printer<<(int)peer_ip_addr[i]<<":";
}
printer<<"\r\n";
for(size_t i=0;i<ext_list.size();++i){
printer<<ext_list[i]->dump()<<"\r\n";
}
return std::move(printer);
}
bool HandshakePacket::loadFromData(uint8_t *buf, size_t len) {
if (HEADER_SIZE + HS_CONTENT_MIN_SIZE > len) {
ErrorL << "size too smalle " << encryption_field;
@ -435,15 +463,11 @@ uint32_t HandshakePacket::generateSynCookie(
while (true) {
// SYN cookie
char clienthost[NI_MAXHOST];
char clientport[NI_MAXSERV];
getnameinfo(
(struct sockaddr *)addr, sizeof(struct sockaddr_storage), clienthost, sizeof(clienthost), clientport,
sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV);
int64_t timestamp = (DurationCountMicroseconds(SteadyClock::now() - ts) / 60000000) + distractor.load()
+ correction; // secret changes every one minute
std::stringstream cookiestr;
cookiestr << clienthost << ":" << clientport << ":" << timestamp;
cookiestr << SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr)
<< ":" << timestamp;
union {
unsigned char cookie[16];
uint32_t cookie_val;

View File

@ -118,9 +118,9 @@ public:
USERDEFINEDTYPE = 0x7FFF
};
uint32_t sub_type : 16;
uint32_t control_type : 15;
uint32_t f : 1;
uint16_t sub_type;
uint16_t control_type;
uint8_t f;
uint8_t type_specific_info[4];
uint32_t timestamp;
uint32_t dst_socket_id;
@ -189,7 +189,7 @@ public:
static uint32_t getSynCookie(uint8_t *buf, size_t len);
static uint32_t
generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0);
std::string dump();
void assignPeerIP(struct sockaddr_storage *addr);
///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override;

View File

@ -86,6 +86,7 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
handleDataPacket(buf, len, addr);
} else {
WarnL<<"DataPacket switch to other transport: "<<socketId;
switchToOtherTransport(buf, len, socketId, addr);
}
} else {
@ -94,6 +95,7 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
uint16_t type = ControlPacket::getControlType(buf, len);
if (type != ControlPacket::HANDSHAKE && socketId != _socket_id && _socket_id != 0) {
// socket id not same
WarnL<<"ControlPacket: "<< (int)type <<" switch to other transport: "<<socketId;
switchToOtherTransport(buf, len, socketId, addr);
return;
}
@ -168,7 +170,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
if (delay <= 120) {
delay = 120;
}
for (auto ext : pkt.ext_list) {
for (auto& ext : pkt.ext_list) {
// TraceL << getIdentifier() << " ext " << ext->dump();
if (!req) {
req = std::dynamic_pointer_cast<HSExtMessage>(ext);
@ -228,7 +230,10 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) {
HandshakePacket pkt;
assert(pkt.loadFromData(buf, len));
if(!pkt.loadFromData(buf, len)){
WarnL<<"is not vaild HandshakePacket";
return;
}
if (pkt.handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
handleHandshakeInduction(pkt, addr);
@ -236,6 +241,7 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag
handleHandshakeConclusion(pkt, addr);
} else {
WarnL << " not support handshake type = " << pkt.handshake_type;
WarnL <<pkt.dump();
}
_ack_ticker.resetTime(_now);
_nak_ticker.resetTime(_now);
@ -288,13 +294,13 @@ void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *add
bool empty = false;
bool flush = false;
for (auto it : pkt.lost_list) {
for (auto& it : pkt.lost_list) {
if (pkt.lost_list.back() == it) {
flush = true;
}
empty = true;
auto re_list = _send_buf->findPacketBySeq(it.first, it.second - 1);
for (auto pkt : re_list) {
for (auto& pkt : re_list) {
pkt->R = 1;
pkt->storeToHeader();
sendPacket(pkt, flush);
@ -325,7 +331,7 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
return;
}
uint32_t max_seq = 0;
for (auto data : list) {
for (auto& data : list) {
max_seq = data->packet_seq_number;
if (_last_pkt_seq + 1 != data->packet_seq_number) {
TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;
@ -495,7 +501,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
// when no data ok send nack to sender immediately
} else {
uint32_t last_seq;
for (auto data : list) {
for (auto& data : list) {
last_seq = data->packet_seq_number;
if (_last_pkt_seq + 1 != data->packet_seq_number) {
TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;

View File

@ -17,7 +17,7 @@ SrtTransportImp::~SrtTransportImp() {
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false,
Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, !_is_pusher,
static_cast<SockInfo &>(*this));
}
}
@ -149,6 +149,11 @@ std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &
return static_pointer_cast<SockInfo>(getSession());
}
toolkit::EventPoller::Ptr SrtTransportImp::getOwnerPoller(MediaSource &sender){
auto session = getSession();
return session ? session->getPoller() : nullptr;
}
void SrtTransportImp::emitOnPublish() {
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
Broadcast::PublishAuthInvoker invoker = [weak_self](const std::string &err, const ProtocolOption &option) {
@ -282,7 +287,10 @@ std::string SrtTransportImp::getIdentifier() const {
bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
if (_muxer) {
return _muxer->inputFrame(frame);
//TraceL<<"before type "<<frame->getCodecName()<<" dts "<<frame->dts()<<" pts "<<frame->pts();
auto frame_tmp = std::make_shared<FrameStamp>(frame, _type_to_stamp[frame->getTrackType()],false);
//TraceL<<"after type "<<frame_tmp->getCodecName()<<" dts "<<frame_tmp->dts()<<" pts "<<frame_tmp->pts();
return _muxer->inputFrame(frame_tmp);
}
if (_cached_func.size() > 200) {
WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped";
@ -290,11 +298,17 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
}
auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); });
_cached_func.emplace_back([this, frame_cached]() {
//TraceL<<"before type "<<frame_cached->getCodecName()<<" dts "<<frame_cached->dts()<<" pts "<<frame_cached->pts();
auto frame_tmp = std::make_shared<FrameStamp>(frame_cached, _type_to_stamp[frame_cached->getTrackType()],false);
//TraceL<<"after type "<<frame_tmp->getCodecName()<<" dts "<<frame_tmp->dts()<<" pts "<<frame_tmp->pts();
_muxer->inputFrame(frame_tmp);
});
return true;
}
bool SrtTransportImp::addTrack(const Track::Ptr &track) {
_type_to_stamp.emplace(track->getTrackType(),Stamp());
if (_muxer) {
return _muxer->addTrack(track);
}
@ -311,6 +325,9 @@ void SrtTransportImp::addTrackCompleted() {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); });
}
if(_type_to_stamp.size() >1){
_type_to_stamp[TrackType::TrackAudio].syncTo(_type_to_stamp[TrackType::TrackVideo]);
}
}
void SrtTransportImp::doCachedFunc() {

View File

@ -59,6 +59,8 @@ protected:
std::string getOriginUrl(mediakit::MediaSource &sender) const override;
// 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
// get poller
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
///////MediaSinkInterface override///////
void resetTracks() override {};
@ -87,6 +89,8 @@ private:
DecoderImp::Ptr _decoder;
std::recursive_mutex _func_mtx;
std::deque<std::function<void()>> _cached_func;
std::unordered_map<int, Stamp> _type_to_stamp;
};
} // namespace SRT

View File

@ -771,7 +771,6 @@ void RtcSession::loadFrom(const string &str) {
session_name = sdp.getSessionName();
session_info = sdp.getSessionInfo();
connection = sdp.getConnection();
bandwidth = sdp.getBandwidth();
time = sdp.getSessionTime();
msid_semantic = sdp.getItemClass<SdpAttrMsidSemantic>('a', "msid-semantic");
for (auto &media : sdp.medias) {
@ -783,6 +782,7 @@ void RtcSession::loadFrom(const string &str) {
rtc_media.type = mline.type;
rtc_media.port = mline.port;
rtc_media.addr = media.getItemClass<SdpConnection>('c');
rtc_media.bandwidth = media.getItemClass<SdpBandwidth>('b');
rtc_media.ice_ufrag = media.getStringItem('a', "ice-ufrag");
rtc_media.ice_pwd = media.getStringItem('a', "ice-pwd");
rtc_media.role = media.getItemClass<SdpAttrSetup>('a', "setup").role;
@ -1060,9 +1060,6 @@ RtcSessionSdp::Ptr RtcSession::toRtcSessionSdp() const{
if(connection.empty()){
sdp.addItem(std::make_shared<SdpConnection>(connection));
}
if (!bandwidth.empty()) {
sdp.addItem(std::make_shared<SdpBandwidth>(bandwidth));
}
sdp.addAttr(std::make_shared<SdpAttrGroup>(group));
sdp.addAttr(std::make_shared<SdpAttrMsidSemantic>(msid_semantic));
for (auto &m : media) {
@ -1080,6 +1077,9 @@ RtcSessionSdp::Ptr RtcSession::toRtcSessionSdp() const{
}
sdp_media.addItem(std::move(mline));
sdp_media.addItem(std::make_shared<SdpConnection>(m.addr));
if (!m.bandwidth.empty() && m.type != TrackAudio) {
sdp_media.addItem(std::make_shared<SdpBandwidth>(m.bandwidth));
}
if (!m.rtcp_addr.empty()) {
sdp_media.addAttr(std::make_shared<SdpAttrRtcp>(m.rtcp_addr));
}
@ -1631,6 +1631,7 @@ RETRY:
answer_media.proto = offer_media.proto;
answer_media.port = offer_media.port;
answer_media.addr = offer_media.addr;
answer_media.bandwidth = offer_media.bandwidth;
answer_media.rtcp_addr = offer_media.rtcp_addr;
answer_media.rtcp_mux = offer_media.rtcp_mux && configure.rtcp_mux;
answer_media.rtcp_rsize = offer_media.rtcp_rsize && configure.rtcp_rsize;

View File

@ -612,6 +612,7 @@ public:
std::string mid;
uint16_t port{0};
SdpConnection addr;
SdpBandwidth bandwidth;
std::string proto;
RtpDirection direction{RtpDirection::invalid};
std::vector<RtcCodecPlan> plan;
@ -666,7 +667,6 @@ public:
std::string session_info;
SdpTime time;
SdpConnection connection;
SdpBandwidth bandwidth;
SdpAttrMsidSemantic msid_semantic;
std::vector<RtcMedia> media;
SdpAttrGroup group;

View File

@ -9,11 +9,11 @@
*/
#include "WebRtcTransport.h"
#include <iostream>
#include "RtpExt.h"
#include "Rtcp/Rtcp.h"
#include "Rtcp/RtcpFCI.h"
#include "RtpExt.h"
#include "Rtsp/RtpReceiver.h"
#include <iostream>
#define RTP_SSRC_OFFSET 1
#define RTX_SSRC_OFFSET 2
@ -48,6 +48,21 @@ static onceToken token([]() {
static atomic<uint64_t> s_key { 0 };
static void translateIPFromEnv(std::vector<std::string> &v) {
for (auto iter = v.begin(); iter != v.end();) {
if (start_with(*iter, "$")) {
auto ip = toolkit::getEnv(*iter);
if (ip.empty()) {
iter = v.erase(iter);
} else {
*iter++ = ip;
}
} else {
++iter;
}
}
}
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
_poller = poller;
_identifier = "zlm_" + to_string(++s_key);
@ -77,7 +92,8 @@ const string &WebRtcTransport::getIdentifier() const {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
void WebRtcTransport::OnIceServerSendStunPacket(
const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
sendSockData((char *)packet->GetData(), packet->GetSize(), tuple);
}
@ -105,16 +121,13 @@ void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void WebRtcTransport::OnDtlsTransportConnected(
const RTC::DtlsTransport *dtlsTransport,
RTC::SrtpSession::CryptoSuite srtpCryptoSuite,
uint8_t *srtpLocalKey,
size_t srtpLocalKeyLen,
uint8_t *srtpRemoteKey,
size_t srtpRemoteKeyLen,
std::string &remoteCert) {
const RTC::DtlsTransport *dtlsTransport, RTC::SrtpSession::CryptoSuite srtpCryptoSuite, uint8_t *srtpLocalKey,
size_t srtpLocalKeyLen, uint8_t *srtpRemoteKey, size_t srtpRemoteKeyLen, std::string &remoteCert) {
InfoL;
_srtp_session_send = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
_srtp_session_send = std::make_shared<RTC::SrtpSession>(
RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
_srtp_session_recv = std::make_shared<RTC::SrtpSession>(
RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
#ifdef ENABLE_SCTP
_sctp = std::make_shared<RTC::SctpAssociationImp>(getPoller(), this, 128, 128, 262144, true);
_sctp->TransportConnected();
@ -122,7 +135,8 @@ void WebRtcTransport::OnDtlsTransportConnected(
onStartWebRTC();
}
void WebRtcTransport::OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
void WebRtcTransport::OnDtlsTransportSendData(
const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
sendSockData((char *)data, len, nullptr);
}
@ -140,7 +154,8 @@ void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransp
onShutdown(SockException(Err_shutdown, "dtls close notify received"));
}
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(
const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
#ifdef ENABLE_SCTP
_sctp->ProcessSctpData(data, len);
#else
@ -166,12 +181,13 @@ void WebRtcTransport::OnSctpAssociationClosed(RTC::SctpAssociation* sctpAssociat
InfoL << getIdentifier();
}
void WebRtcTransport::OnSctpAssociationSendData(RTC::SctpAssociation* sctpAssociation, const uint8_t* data, size_t len) {
void WebRtcTransport::OnSctpAssociationSendData(
RTC::SctpAssociation *sctpAssociation, const uint8_t *data, size_t len) {
_dtls_transport->SendApplicationData(data, len);
}
void WebRtcTransport::OnSctpAssociationMessageReceived(RTC::SctpAssociation *sctpAssociation, uint16_t streamId,
uint32_t ppid, const uint8_t *msg, size_t len) {
void WebRtcTransport::OnSctpAssociationMessageReceived(
RTC::SctpAssociation *sctpAssociation, uint16_t streamId, uint32_t ppid, const uint8_t *msg, size_t len) {
InfoL << getIdentifier() << " " << streamId << " " << ppid << " " << len << " " << string((char *)msg, len);
RTC::SctpStreamParameters params;
params.streamId = streamId;
@ -219,7 +235,8 @@ string getFingerprint(const string &algorithm_str, const std::shared_ptr<RTC::Dt
void WebRtcTransport::setRemoteDtlsFingerprint(const RtcSession &remote) {
// 设置远端dtls签名
RTC::DtlsTransport::Fingerprint remote_fingerprint;
remote_fingerprint.algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
remote_fingerprint.algorithm
= RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
remote_fingerprint.value = _offer_sdp->media[0].fingerprint.hash;
_dtls_transport->SetRemoteFingerprint(remote_fingerprint);
}
@ -244,8 +261,8 @@ std::string WebRtcTransport::getAnswerSdp(const string &offer){
fingerprint.algorithm = _offer_sdp->media[0].fingerprint.algorithm;
fingerprint.hash = getFingerprint(fingerprint.algorithm, _dtls_transport);
RtcConfigure configure;
configure.setDefaultSetting(_ice_server->GetUsernameFragment(), _ice_server->GetPassword(),
RtpDirection::sendrecv, fingerprint);
configure.setDefaultSetting(
_ice_server->GetUsernameFragment(), _ice_server->GetPassword(), RtpDirection::sendrecv, fingerprint);
onRtcConfigure(configure);
//// 生成answer sdp ////
@ -349,7 +366,9 @@ void WebRtcTransportImp::onCreate(){
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
GET_CONFIG(float, timeoutSec, RTC::kTimeOutSec);
_timer = std::make_shared<Timer>(timeoutSec / 2, [weak_self]() {
_timer = std::make_shared<Timer>(
timeoutSec / 2,
[weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
@ -358,14 +377,14 @@ void WebRtcTransportImp::onCreate(){
strong_self->onShutdown(SockException(Err_timeout, "接受rtp和rtcp超时"));
}
return true;
}, getPoller());
},
getPoller());
_twcc_ctx.setOnSendTwccCB([this](uint32_t ssrc, string fci) {
onSendTwcc(ssrc, fci);
});
_twcc_ctx.setOnSendTwccCB([this](uint32_t ssrc, string fci) { onSendTwcc(ssrc, fci); });
}
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller) : WebRtcTransport(poller) {
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller)
: WebRtcTransport(poller) {
InfoL << getIdentifier();
}
@ -440,7 +459,8 @@ void WebRtcTransportImp::onStartWebRTC() {
_ssrc_to_track[track->offer_ssrc_rtx] = track;
// rtp pt --> MediaTrack
_pt_to_track.emplace(track->plan_rtp->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtpTrack(track, _twcc_ctx, *this)));
_pt_to_track.emplace(
track->plan_rtp->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtpTrack(track, _twcc_ctx, *this)));
if (track->plan_rtx) {
// rtx pt --> MediaTrack
_pt_to_track.emplace(track->plan_rtx->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtxTrack(track)));
@ -468,9 +488,10 @@ void WebRtcTransportImp::onStartWebRTC() {
// 系统又要有rid这里手工生成rid并为其绑定ssrc
std::string rid = "r" + std::to_string(index);
track->rtp_ext_ctx->setRid(ssrc.ssrc, rid);
if(ssrc.rtx_ssrc)
if (ssrc.rtx_ssrc) {
track->rtp_ext_ctx->setRid(ssrc.rtx_ssrc, rid);
}
}
++index;
}
}
@ -480,8 +501,10 @@ void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
// 修改answer sdp的ip、端口信息
GET_CONFIG_FUNC(std::vector<std::string>, extern_ips, RTC::kExternIP, [](string str) {
std::vector<std::string> ret;
if (str.length())
if (str.length()) {
ret = split(str, ",");
}
translateIPFromEnv(ret);
return ret;
});
for (auto &m : sdp.media) {
@ -528,14 +551,19 @@ void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) {
switch (type) {
case SdpType::answer: onCheckAnswer(sdp); break;
case SdpType::offer: break;
default: /*不可达*/ assert(0); break;
case SdpType::answer:
onCheckAnswer(sdp);
break;
case SdpType::offer:
break;
default: /*不可达*/
assert(0);
break;
}
}
SdpAttrCandidate::Ptr makeIceCandidate(std::string ip, uint16_t port,
uint32_t priority = 100, std::string proto = "udp") {
SdpAttrCandidate::Ptr
makeIceCandidate(std::string ip, uint16_t port, uint32_t priority = 100, std::string proto = "udp") {
auto candidate = std::make_shared<SdpAttrCandidate>();
// rtp端口
candidate->component = 1;
@ -556,15 +584,16 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
// 添加接收端口candidate信息
GET_CONFIG_FUNC(std::vector<std::string>, extern_ips, RTC::kExternIP, [](string str) {
std::vector<std::string> ret;
if (str.length())
if (str.length()) {
ret = split(str, ",");
}
translateIPFromEnv(ret);
return ret;
});
if (extern_ips.empty()) {
std::string localIp = SockUtil::get_local_ip();
configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp"));
}
else {
} else {
const uint32_t delta = 10;
uint32_t priority = 100 + delta * extern_ips.size();
for (auto ip : extern_ips) {
@ -574,20 +603,18 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
}
}
///////////////////////////////////////////////////////////////////
class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this<RtpChannel> {
class RtpChannel
: public RtpTrackImp
, public std::enable_shared_from_this<RtpChannel> {
public:
RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
_poller = std::move(poller);
_on_nack = std::move(on_nack);
setOnSorted(std::move(cb));
_nack_ctx.setOnNack([this](const FCI_NACK &nack) {
onNack(nack);
});
_nack_ctx.setOnNack([this](const FCI_NACK &nack) { onNack(nack); });
}
~RtpChannel() override = default;
@ -752,11 +779,30 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
});
break;
}
default: break;
default:
break;
}
break;
}
default: break;
case RtcpType::RTCP_XR: {
RtcpXRRRTR *xr = (RtcpXRRRTR *)rtcp;
if (xr->bt != 4) {
break;
}
auto it = _ssrc_to_track.find(xr->ssrc);
if (it == _ssrc_to_track.end()) {
WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
return;
}
auto &track = it->second;
track->rtcp_context_send->onRtcp(rtcp);
auto xrdlrr = track->rtcp_context_send->createRtcpXRDLRR(track->answer_ssrc_rtp, track->answer_ssrc_rtp);
sendRtcpPacket(xrdlrr->data(), xrdlrr->size(), true);
break;
}
default:
break;
}
}
}
@ -767,9 +813,9 @@ void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, Medi
// rid --> RtpReceiverImp
auto &ref = track.rtp_channel[rid];
weak_ptr<WebRtcTransportImp> weak_self = dynamic_pointer_cast<WebRtcTransportImp>(shared_from_this());
ref = std::make_shared<RtpChannel>(getPoller(), [&track, this, rid](RtpPacket::Ptr rtp) mutable {
onSortedRtp(track, rid, std::move(rtp));
}, [&track, weak_self, ssrc](const FCI_NACK &nack) mutable {
ref = std::make_shared<RtpChannel>(
getPoller(), [&track, this, rid](RtpPacket::Ptr rtp) mutable { onSortedRtp(track, rid, std::move(rtp)); },
[&track, weak_self, ssrc](const FCI_NACK &nack) mutable {
// nack发送可能由定时器异步触发
auto strong_self = weak_self.lock();
if (strong_self) {
@ -833,7 +879,8 @@ void WrappedRtxTrack::inputRtp(const char *buf, size_t len, uint64_t stamp_ms, R
auto &ref = track->rtp_channel[rid];
if (!ref) {
// 再接收到对应的rtp前丢弃rtx包
WarnL << "unknown rtx rtp, rid:" << rid << ", ssrc:" << ntohl(rtp->ssrc) << ", codec:" << track->plan_rtp->codec << ", seq:" << ntohs(rtp->seq);
WarnL << "unknown rtx rtp, rid:" << rid << ", ssrc:" << ntohl(rtp->ssrc) << ", codec:" << track->plan_rtp->codec
<< ", seq:" << ntohs(rtp->seq);
return;
}
@ -900,7 +947,9 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool r
}
if (!rtx) {
// 统计rtp发送情况好做sr汇报
track->rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate, rtp->size() - RtpPacket::kRtpTcpHeaderSize);
track->rtcp_context_send->onRtp(
rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate,
rtp->size() - RtpPacket::kRtpTcpHeaderSize);
track->nack_list.pushBack(rtp);
#if 0
//此处模拟发送丢包
@ -970,8 +1019,9 @@ void WebRtcTransportImp::onShutdown(const SockException &ex){
void WebRtcTransportImp::setSession(Session::Ptr session) {
_history_sessions.emplace(session.get(), session);
if (_selected_session) {
InfoL << "rtc network changed: " << _selected_session->get_peer_ip() << ":" << _selected_session->get_peer_port()
<< " -> " << session->get_peer_ip() << ":" << session->get_peer_port() << ", id:" << getIdentifier();
InfoL << "rtc network changed: " << _selected_session->get_peer_ip() << ":"
<< _selected_session->get_peer_port() << " -> " << session->get_peer_ip() << ":"
<< session->get_peer_port() << ", id:" << getIdentifier();
}
_selected_session = std::move(session);
unrefSelf();
@ -1044,8 +1094,8 @@ void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) {
_map_creator[type] = std::move(cb);
}
void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args,
const onCreateRtc &cb) {
void WebRtcPluginManager::getAnswerSdp(
Session &sender, const string &type, const string &offer, const WebRtcArgs &args, const onCreateRtc &cb) {
lock_guard<mutex> lck(_mtx_creator);
auto it = _map_creator.find(type);
if (it == _map_creator.end()) {
@ -1055,17 +1105,20 @@ void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, cons
it->second(sender, offer, args, cb);
}
#include "WebRtcEchoTest.h"
#include "WebRtcPlayer.h"
#include "WebRtcPusher.h"
#include "WebRtcEchoTest.h"
void echo_plugin(Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
void echo_plugin(
Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
cb(*WebRtcEchoTest::create(EventPollerPool::Instance().getPoller()));
}
void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
void push_plugin(
Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp, info](const string &err, const ProtocolOption &option) mutable {
Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp,
info](const string &err, const ProtocolOption &option) mutable {
if (!err.empty()) {
cb(WebRtcException(SockException(Err_other, err)));
return;
@ -1104,20 +1157,23 @@ void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &arg
push_src_ownership = push_src->getOwnership();
push_src->setProtocolOption(option);
}
auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option);
auto rtc
= WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option);
push_src->setListener(rtc);
cb(*rtc);
};
// rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtc_push, info, invoker, static_cast<SockInfo &>(sender));
auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPublish, MediaOriginType::rtc_push, info, invoker, static_cast<SockInfo &>(sender));
if (!flag) {
// 该事件无人监听,默认不鉴权
invoker("", ProtocolOption());
}
}
void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
void play_plugin(
Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
MediaInfo info(args["url"]);
auto session_ptr = sender.shared_from_this();
Broadcast::AuthInvoker invoker = [cb, offer_sdp, info, session_ptr](const string &err) mutable {
@ -1142,7 +1198,8 @@ void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &arg
};
// 广播通用播放url鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, invoker, static_cast<SockInfo &>(sender));
auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPlayed, info, invoker, static_cast<SockInfo &>(sender));
if (!flag) {
// 该事件无人监听,默认不鉴权
invoker("");

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long