Compare commits

..

4 Commits

Author SHA1 Message Date
alex
9fe3f5d2ff
Merge 610ac62e90 into 51f49d3a89 2024-11-12 19:14:05 +08:00
xiongguangjie
51f49d3a89
Fix mp4 record segment bug (#4008 #4007)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2024-11-09 19:33:36 +08:00
286897655
8e823b3b74
提升rtsp协议兼容性 (#4003)
海康解码器播放rtsp流url带了?key=value参数,SETUP时会带着这些参数,导致获取trackid失败
2024-11-09 19:32:18 +08:00
xia-chu
ef11c66fb8 新增语音对讲接口(startSendRtpTalk) 2024-11-09 19:29:04 +08:00
8 changed files with 226 additions and 60 deletions

View File

@ -2161,6 +2161,81 @@
},
"response": []
},
{
"name": "开始双向对讲(startSendRtpTalk)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/startSendRtpTalk?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&ssrc=1&recv_stream_id=",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"startSendRtpTalk"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置)"
},
{
"key": "vhost",
"value": "{{defaultVhost}}",
"description": "虚拟主机例如__defaultVhost__"
},
{
"key": "app",
"value": "rtp",
"description": "应用名,例如 rtp"
},
{
"key": "stream",
"value": "rtc",
"description": "流id例如webrtc推流上来的流id"
},
{
"key": "ssrc",
"value": "1",
"description": "rtp推流出去的ssrc"
},
{
"key": "recv_stream_id",
"value": "",
"description": "对方rtp推流上来的流id我们将通过这个链接回复他rtp流请注意两个流的app和vhost需一致"
},
{
"key": "from_mp4",
"value": "0",
"description": "是否推送本地MP4录像该参数非必选参数",
"disabled": true
},
{
"key": "type",
"value": "1",
"description": "0(ES流)、1(PS流)、2(TS流)默认1(PS流);该参数非必选参数",
"disabled": true
},
{
"key": "pt",
"value": "96",
"description": "rtp payload type默认96该参数非必选参数",
"disabled": true
},
{
"key": "only_audio",
"value": "1",
"description": "rtp es方式打包时是否只打包音频该参数非必选参数",
"disabled": true
}
]
}
},
"response": []
},
{
"name": "停止 发送rtp(stopSendRtp)",
"request": {

View File

@ -1606,6 +1606,41 @@ void installWebApi() {
start_send_rtp(true, API_ARGS_VALUE, invoker);
});
api_regist("/index/api/startSendRtpTalk",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream", "ssrc", "recv_stream_id");
auto src = MediaSource::find(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["from_mp4"].as<int>());
if (!src) {
throw ApiRetException("can not find the source stream", API::NotFound);
}
MediaSourceEvent::SendRtpArgs args;
args.con_type = mediakit::MediaSourceEvent::SendRtpArgs::kVoiceTalk;
args.ssrc = allArgs["ssrc"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.data_type = allArgs["type"].empty() ? MediaSourceEvent::SendRtpArgs::kRtpPS : (MediaSourceEvent::SendRtpArgs::DataType)(allArgs["type"].as<int>());
args.only_audio = allArgs["only_audio"].as<bool>();
args.recv_stream_id = allArgs["recv_stream_id"];
args.recv_stream_app = allArgs["app"];
args.recv_stream_vhost = allArgs["vhost"];
src->getOwnerPoller()->async([=]() mutable {
try {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
}
val["local_port"] = local_port;
invoker(200, headerOut, val.toStyledString());
});
} catch (std::exception &ex) {
val["code"] = API::Exception;
val["msg"] = ex.what();
invoker(200, headerOut, val.toStyledString());
}
});
});
api_regist("/index/api/listRtpSender",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream");

View File

@ -124,7 +124,8 @@ public:
kTcpActive = 0, // tcp主动模式tcp客户端主动连接对方并发送rtp
kUdpActive = 1, // udp主动模式主动发送数据给对方
kTcpPassive = 2, // tcp被动模式tcp服务器等待对方连接并回复rtp
kUdpPassive = 3 // udp被动方式等待对方发送nat打洞包然后回复rtp至打洞包源地址
kUdpPassive = 3, // udp被动方式等待对方发送nat打洞包然后回复rtp至打洞包源地址
kVoiceTalk = 4, // 语音对讲模式对方必须想推流上来通过他的推流链路再回复rtp数据
};
// rtp类型 [AUTO-TRANSLATED:acca40ab]

View File

@ -123,11 +123,15 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
if (!(_have_video && frame->getTrackType() == TrackAudio)) {
// 如果有视频且输入的是音频,那么应该忽略切片逻辑 [AUTO-TRANSLATED:fbb15d93]
// If there is video and the input is audio, then the slice logic should be ignored
if (_last_dts == 0 || _last_dts > frame->dts()) {
if (_last_dts == 0) {
// first frame assign dts
_last_dts = frame->dts();
} else if (_last_dts > frame->dts()) {
// b帧情况下dts时间戳可能回退 [AUTO-TRANSLATED:1de38f77]
// In the case of b-frames, the dts timestamp may regress
_last_dts = MIN(frame->dts(), _last_dts);
}
auto duration = 5u; // 默认至少一帧5ms
if (frame->dts() > 0 && frame->dts() > _last_dts) {
duration = MAX(duration, frame->dts() - _last_dts);

View File

@ -346,5 +346,9 @@ float RtpProcess::getLossRate(MediaSource &sender, TrackType type) {
return getLostInterval() * 100 / expected;
}
const toolkit::Socket::Ptr& RtpProcess::getSock() const {
return _sock;
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)

View File

@ -102,6 +102,8 @@ public:
uint16_t get_peer_port() override;
std::string getIdentifier() const override;
const toolkit::Socket::Ptr& getSock() const;
protected:
bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr & track) override;

View File

@ -190,6 +190,25 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct
}
}, delay_ms / 1000.0, "::", args.src_port);
InfoL << "start tcp active send rtp to: " << args.dst_url << ":" << args.dst_port;
} else if (args.con_type == MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
auto src = MediaSource::find(args.recv_stream_vhost, args.recv_stream_app, args.recv_stream_id);
if (!src) {
cb(0, SockException(Err_other, "can not find the target stream"));
return;
}
auto processor = src->getRtpProcess();
if (!processor) {
cb(0, SockException(Err_other, "get rtp processor from target stream failed"));
return;
}
auto sock = processor->getSock();
if (!sock) {
cb(0, SockException(Err_other, "get sock from rtp processor failed"));
return;
}
_socket_rtp = std::move(sock);
onConnect();
cb(_socket_rtp->get_local_port(), SockException());
} else {
CHECK(0, "invalid con type");
}
@ -249,48 +268,51 @@ void RtpSender::onConnect() {
// 加大发送缓存,防止udp丢包之类的问题 [AUTO-TRANSLATED:6e1cb40a]
// Increase the send buffer to prevent problems such as UDP packet loss
SockUtil::setSendBuf(_socket_rtp->rawFD(), 4 * 1024 * 1024);
if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) {
// 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378]
// Close TCP no_delay and enable MSG_MORE to improve sending performance
SockUtil::setNoDelay(_socket_rtp->rawFD(), false);
_socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
} else if (_args.udp_rtcp_timeout) {
createRtcpSocket();
}
// 连接建立成功事件 [AUTO-TRANSLATED:ac279c86]
// Connection established successfully event
weak_ptr<RtpSender> weak_self = shared_from_this();
if (!_args.recv_stream_id.empty()) {
mINI ini;
ini[RtpSession::kStreamID] = _args.recv_stream_id;
// 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663]
// Force synchronization of the app and vhost of the receive stream and send stream
ini[RtpSession::kApp] = _args.recv_stream_app;
ini[RtpSession::kVhost] = _args.recv_stream_vhost;
_rtp_session = std::make_shared<RtpSession>(_socket_rtp);
_rtp_session->setParams(ini);
_socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) {
// 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378]
// Close TCP no_delay and enable MSG_MORE to improve sending performance
SockUtil::setNoDelay(_socket_rtp->rawFD(), false);
_socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
} else if (_args.udp_rtcp_timeout) {
createRtcpSocket();
}
// 连接建立成功事件 [AUTO-TRANSLATED:ac279c86]
// Connection established successfully event
weak_ptr<RtpSender> weak_self = shared_from_this();
if (!_args.recv_stream_id.empty()) {
mINI ini;
ini[RtpSession::kStreamID] = _args.recv_stream_id;
// 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663]
// Force synchronization of the app and vhost of the receive stream and send stream
ini[RtpSession::kApp] = _args.recv_stream_app;
ini[RtpSession::kVhost] = _args.recv_stream_vhost;
_rtp_session = std::make_shared<RtpSession>(_socket_rtp);
_rtp_session->setParams(ini);
_socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
try {
strong_self->_rtp_session->onRecv(buf);
} catch (std::exception &ex) {
SockException err(toolkit::Err_shutdown, ex.what());
strong_self->_rtp_session->shutdown(err);
}
});
} else {
_socket_rtp->setOnRead(nullptr);
}
_socket_rtp->setOnErr([weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
try {
strong_self->_rtp_session->onRecv(buf);
} catch (std::exception &ex) {
SockException err(toolkit::Err_shutdown, ex.what());
strong_self->_rtp_session->shutdown(err);
if (strong_self) {
strong_self->onErr(err);
}
});
} else {
_socket_rtp->setOnRead(nullptr);
}
_socket_rtp->setOnErr([weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onErr(err);
}
});
InfoL << "startSend rtp success: " << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", data_type: " << _args.data_type << ", con_type: " << _args.con_type;
}
@ -378,28 +400,51 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr>> rtp_list) {
return;
}
size_t i = 0;
auto size = rtp_list->size();
rtp_list->for_each([&](Buffer::Ptr &packet) {
switch (_args.con_type) {
case MediaSourceEvent::SendRtpArgs::kUdpActive:
case MediaSourceEvent::SendRtpArgs::kUdpPassive: {
onSendRtpUdp(packet, i == 0);
// udp模式rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b]
// UDP mode, the first 4 bytes of rtp over tcp can be ignored
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
break;
auto send_func = [this](const shared_ptr<List<Buffer::Ptr>> &rtp_list) {
size_t i = 0;
auto size = rtp_list->size();
rtp_list->for_each([&](Buffer::Ptr &packet) {
switch (_args.con_type) {
case MediaSourceEvent::SendRtpArgs::kUdpActive:
case MediaSourceEvent::SendRtpArgs::kUdpPassive: {
onSendRtpUdp(packet, i == 0);
// udp模式rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b]
// UDP mode, the first 4 bytes of rtp over tcp can be ignored
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
break;
}
case MediaSourceEvent::SendRtpArgs::kTcpActive:
case MediaSourceEvent::SendRtpArgs::kTcpPassive: {
// tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a]
// TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
break;
}
case MediaSourceEvent::SendRtpArgs::kVoiceTalk: {
auto type = _socket_rtp->alive() ? _socket_rtp->sockType() : SockNum::Sock_Invalid;
if (type == SockNum::Sock_UDP) {
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
} else if (type == SockNum::Sock_TCP) {
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
} else {
onErr(SockException(Err_other, "dst socket disconnected"));
}
break;
}
default: CHECK(0);
}
case MediaSourceEvent::SendRtpArgs::kTcpActive:
case MediaSourceEvent::SendRtpArgs::kTcpPassive: {
// tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a]
// TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
break;
});
};
if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
weak_ptr<RtpSender> weak_self = shared_from_this();
_socket_rtp->getPoller()->async([weak_self, rtp_list, send_func]() {
if (auto strong_self = weak_self.lock()) {
send_func(rtp_list);
}
default: CHECK(0);
}
});
});
} else {
send_func(rtp_list);
}
}
void RtpSender::onErr(const SockException &ex) {

View File

@ -1136,7 +1136,7 @@ int RtspSession::getTrackIndexByTrackType(TrackType type) {
int RtspSession::getTrackIndexByControlUrl(const string &control_url) {
for (size_t i = 0; i < _sdp_track.size(); ++i) {
if (control_url == _sdp_track[i]->getControlUrl(_content_base)) {
if (control_url.find(_sdp_track[i]->getControlUrl(_content_base)) == 0) {
return i;
}
}