Refine: 优化tcp/udp服务器异常管理断开机制

This commit is contained in:
ziyue 2021-11-19 15:26:33 +08:00
parent 7e6467615c
commit afdc5e4267
7 changed files with 33 additions and 63 deletions

@ -1 +1 @@
Subproject commit 60eb96ded0493fb368dd907997211b6beffb6777
Subproject commit f4cae4bbffe1e58392a0c7201f6d501a48141289

View File

@ -75,12 +75,7 @@ ssize_t HttpSession::onRecvHeader(const char *header,size_t len) {
//默认后面数据不是content而是header
ssize_t content_len = 0;
auto &fun = it->second;
try {
(this->*fun)(content_len);
}catch (exception &ex){
shutdown(SockException(Err_shutdown,ex.what()));
}
(this->*(it->second))(content_len);
//清空解析器节省内存
_parser.Clear();

View File

@ -60,12 +60,8 @@ void RtmpSession::onManager() {
void RtmpSession::onRecv(const Buffer::Ptr &buf) {
_ticker.resetTime();
try {
_total_bytes += buf->size();
onParseRtmp(buf->data(), buf->size());
} catch (exception &ex) {
shutdown(SockException(Err_shutdown, ex.what()));
}
_total_bytes += buf->size();
onParseRtmp(buf->data(), buf->size());
}
void RtmpSession::onCmd_connect(AMFDecoder &dec) {

View File

@ -45,17 +45,11 @@ RtpSession::~RtpSession() {
}
void RtpSession::onRecv(const Buffer::Ptr &data) {
try {
if (_is_udp) {
onRtpPacket(data->data(), data->size());
return;
}
RtpSplitter::input(data->data(), data->size());
} catch (SockException &ex) {
shutdown(ex);
} catch (std::exception &ex) {
shutdown(SockException(Err_other, ex.what()));
if (_is_udp) {
onRtpPacket(data->data(), data->size());
return;
}
RtpSplitter::input(data->data(), data->size());
}
void RtpSession::onError(const SockException &err) {

View File

@ -125,46 +125,36 @@ void RtspSession::onRecv(const Buffer::Ptr &buf) {
void RtspSession::onWholeRtspPacket(Parser &parser) {
string method = parser.Method(); //提取出请求命令字
_cseq = atoi(parser["CSeq"].data());
if(_content_base.empty() && method != "GET"){
if (_content_base.empty() && method != "GET") {
_content_base = parser.Url();
_media_info.parse(parser.FullUrl());
_media_info._schema = RTSP_SCHEMA;
}
typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
using rtsp_request_handler = void (RtspSession::*)(const Parser &parser);
static unordered_map<string, rtsp_request_handler> s_cmd_functions;
static onceToken token( []() {
s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
static onceToken token([]() {
s_cmd_functions.emplace("OPTIONS", &RtspSession::handleReq_Options);
s_cmd_functions.emplace("DESCRIBE", &RtspSession::handleReq_Describe);
s_cmd_functions.emplace("ANNOUNCE", &RtspSession::handleReq_ANNOUNCE);
s_cmd_functions.emplace("RECORD", &RtspSession::handleReq_RECORD);
s_cmd_functions.emplace("SETUP", &RtspSession::handleReq_Setup);
s_cmd_functions.emplace("PLAY", &RtspSession::handleReq_Play);
s_cmd_functions.emplace("PAUSE", &RtspSession::handleReq_Pause);
s_cmd_functions.emplace("TEARDOWN", &RtspSession::handleReq_Teardown);
s_cmd_functions.emplace("GET", &RtspSession::handleReq_Get);
s_cmd_functions.emplace("POST", &RtspSession::handleReq_Post);
s_cmd_functions.emplace("SET_PARAMETER", &RtspSession::handleReq_SET_PARAMETER);
s_cmd_functions.emplace("GET_PARAMETER", &RtspSession::handleReq_SET_PARAMETER);
});
auto it = s_cmd_functions.find(method);
if (it == s_cmd_functions.end()) {
sendRtspResponse("403 Forbidden");
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << method));
return;
throw SockException(Err_shutdown, StrPrinter << "403 Forbidden:" << method);
}
auto &fun = it->second;
try {
(this->*fun)(parser);
}catch (SockException &ex){
if(ex){
shutdown(ex);
}
}catch (exception &ex){
shutdown(SockException(Err_shutdown,ex.what()));
}
(this->*(it->second))(parser);
parser.Clear();
}
@ -979,7 +969,13 @@ void RtspSession::startListenPeerUdpData(int track_idx) {
if (!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(interleaved, buf, addr);
try {
strongSelf->onRcvPeerUdpData(interleaved, buf, addr);
} catch (SockException &ex) {
strongSelf->shutdown(ex);
} catch (std::exception &ex) {
strongSelf->shutdown(SockException(Err_other, ex.what()));
}
});
return true;
};

View File

@ -51,14 +51,6 @@ WebRtcSession::~WebRtcSession() {
}
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
try {
onRecv_l(buffer);
} catch (std::exception &ex) {
shutdown(SockException(Err_shutdown, ex.what()));
}
}
void WebRtcSession::onRecv_l(const Buffer::Ptr &buffer) {
if (_find_transport) {
//只允许寻找一次transport
_find_transport = false;

View File

@ -30,9 +30,6 @@ public:
static EventPoller::Ptr queryPoller(const Buffer::Ptr &buffer);
private:
void onRecv_l(const Buffer::Ptr &);
private:
std::string _identifier;
bool _find_transport = true;