Optimize cmake file.

This commit is contained in:
amass 2023-12-03 23:42:58 +08:00
parent 10b20c2512
commit 5fb614342d
13 changed files with 16 additions and 864 deletions

View File

@ -1,94 +0,0 @@
#ifndef BASICSOCKET_H
#define BASICSOCKET_H
#include "Message.h"
#include "Options.h"
#include <boost/asio/buffer.hpp>
#include <boost/asio/execution_context.hpp>
#include <boost/range/metafunctions.hpp>
#include <zmq.h>
namespace ZeroMQ {
template <typename Service>
class BasicSocket {
public:
using ImplementationType = typename Service::ImplementationType;
BasicSocket(boost::asio::io_context &context, SocketType type);
void connect(std::string_view address, boost::system::error_code &error);
void connect(std::string_view address);
/**
* @brief only see the instance is nullptr or not.
*/
bool connected() const;
void bind(std::string_view address, boost::system::error_code &error);
void bind(std::string_view address);
template <int Option, class T, bool BoolUnit>
void setOption(IntegralOption<Option, T, BoolUnit>, const T &value, boost::system::error_code &error);
template <int Option, class T, bool BoolUnit>
void setOption(IntegralOption<Option, T, BoolUnit>, const T &value);
template <int Option, int NullTerm>
void setOption(ArrayOption<Option, NullTerm>, const std::string_view &buffer, boost::system::error_code &error);
template <int Option, class T, bool BoolUnit>
T option(IntegralOption<Option, T, BoolUnit>, boost::system::error_code &error) const;
template <int Option, class T, bool BoolUnit>
T option(IntegralOption<Option, T, BoolUnit>) const;
size_t send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error);
template <typename ConstBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error);
template <typename ConstBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
send(const ConstBufferSequence &buffers, SendFlags flags = SendFlags::None);
size_t send(Message &&message, SendFlags flags, boost::system::error_code &error);
size_t send(Message &&message, SendFlags flags);
std::size_t receive(Message &message, RecvFlags flags, boost::system::error_code &error);
std::size_t receive(Message &message, RecvFlags flags = RecvFlags::None);
size_t receive(boost::asio::mutable_buffer buffer, RecvFlags flags, boost::system::error_code &error);
size_t receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags = RecvFlags::None);
template <typename MutableBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error);
template <typename MutableBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
receive(const MutableBufferSequence &buffers, RecvFlags flags = RecvFlags::None);
// void read_handler(const boost::system::error_code& ec,std::size_t bytes_transferred)
template <typename ReadHandler>
void asyncReceive(Message &message, ReadHandler &&handler);
template <typename MutableBufferSequence, typename ReadHandler>
void asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler);
template <bool CheckN, class OutputIt>
size_t receiveMultipart(OutputIt &out, size_t n, RecvFlags flags, boost::system::error_code &error);
template <class OutputIt, typename ReadHandler>
void asyncReceiveMultipart(OutputIt out, ReadHandler &&handler);
boost::asio::io_context &ioContext() const;
~BasicSocket();
private:
Service &m_service;
ImplementationType m_impl;
};
} // namespace ZeroMQ
#include "BasicSocket.inl"
#endif // BASICSOCKET_H

View File

@ -1,331 +0,0 @@
#ifndef BASICSOCKET_INL
#define BASICSOCKET_INL
#include "BasicSocket.h"
#include "ErrorCode.h"
#include "SocketService.h"
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/post.hpp>
#include <zmq.h>
#include "BoostLog.h"
namespace ZeroMQ {
template <typename Service>
BasicSocket<Service>::BasicSocket(boost::asio::io_context &context, SocketType type)
: m_service(boost::asio::use_service<Service>(context)) {
m_service.construct(m_impl, type);
}
template <typename Service>
BasicSocket<Service>::~BasicSocket() {
m_service.destroy(m_impl);
}
template <typename Service>
void BasicSocket<Service>::connect(std::string_view address, boost::system::error_code &error) {
m_service.connect(m_impl, std::move(address), error);
}
template <typename Service>
void BasicSocket<Service>::connect(std::string_view address) {
boost::system::error_code error;
connect(std::move(address), error);
if (error) throw error;
}
template <typename Service>
bool BasicSocket<Service>::connected() const {
return m_impl != nullptr && m_impl->socket != nullptr;
}
template <typename Service>
void BasicSocket<Service>::bind(std::string_view address, boost::system::error_code &error) {
std::lock_guard lock_guard(m_impl->mutex);
auto status = zmq_bind(m_impl->socket, address.data());
if (status < 0) {
error = makeErrorCode();
return;
}
}
template <typename Service>
void BasicSocket<Service>::bind(std::string_view address) {
boost::system::error_code error;
bind(std::move(address), error);
if (error) throw error;
}
template <typename Service>
template <int Option, class T, bool BoolUnit>
void BasicSocket<Service>::setOption(IntegralOption<Option, T, BoolUnit>, const T &value,
boost::system::error_code &error) {
static_assert(std::is_integral<T>::value, "T must be integral");
m_service.setOption(m_impl, Option, &value, sizeof(value), error);
}
template <typename Service>
template <int Option, class T, bool BoolUnit>
void BasicSocket<Service>::setOption(IntegralOption<Option, T, BoolUnit>, const T &value) {
boost::system::error_code error;
setOption(IntegralOption<Option, T, BoolUnit>(), value, error);
if (error) throw error;
}
template <typename Service>
template <int Option, int NullTerm>
void BasicSocket<Service>::setOption(ArrayOption<Option, NullTerm>, const std::string_view &buffer,
boost::system::error_code &error) {
m_service.setOption(m_impl, Option, buffer.data(), buffer.size(), error);
}
template <typename Service>
template <int Option, class T, bool BoolUnit>
T BasicSocket<Service>::option(IntegralOption<Option, T, BoolUnit>, boost::system::error_code &error) const {
static_assert(std::is_integral<T>::value, "T must be integral");
T value;
size_t size = sizeof value;
m_service.option(m_impl, Option, &value, &size, error);
assert(size == sizeof value);
return value;
}
template <typename Service>
template <int Option, class T, bool BoolUnit>
T BasicSocket<Service>::option(IntegralOption<Option, T, BoolUnit>) const {
boost::system::error_code error;
auto ret = option(IntegralOption<Option, T, BoolUnit>(), error);
if (error) throw error;
return ret;
}
template <typename Service>
boost::asio::io_context &BasicSocket<Service>::ioContext() const {
return m_service.get_io_context();
}
template <typename Service>
size_t BasicSocket<Service>::send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error) {
const int nbytes = zmq_send(m_impl->socket, buffer.data(), buffer.size(), static_cast<int>(flags));
if (nbytes >= 0) return static_cast<size_t>(nbytes);
error = makeErrorCode();
return nbytes;
}
template <typename Service>
template <typename ConstBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
BasicSocket<Service>::send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error) {
size_t res = 0;
auto last = std::distance(std::begin(buffers), std::end(buffers)) - 1;
auto index = 0u;
for (auto it = std::begin(buffers); it != std::end(buffers); ++it, ++index) {
auto f = index == last ? static_cast<int>(flags) : static_cast<int>(flags) | ZMQ_SNDMORE;
res += send(*it, static_cast<SendFlags>(f), error);
if (error) return 0u;
}
return res;
}
template <typename Service>
template <typename ConstBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
BasicSocket<Service>::send(const ConstBufferSequence &buffers, SendFlags flags) {
boost::system::error_code error;
auto size = send(buffers, flags, error);
if (error) throw error;
return size;
}
template <typename Service>
size_t BasicSocket<Service>::send(Message &&message, SendFlags flags, boost::system::error_code &error) {
int nbytes = zmq_msg_send(message.handle(), m_impl->socket, static_cast<int>(flags));
if (nbytes >= 0) return static_cast<size_t>(nbytes);
error = makeErrorCode();
return nbytes;
}
template <typename Service>
size_t BasicSocket<Service>::send(Message &&message, SendFlags flags) {
boost::system::error_code error;
auto size = send(std::move(message), flags, error);
if (error) throw error;
return size;
}
template <typename Service>
std::size_t BasicSocket<Service>::receive(Message &message, RecvFlags flags, boost::system::error_code &error) {
std::lock_guard lock_guard(m_impl->mutex);
BOOST_ASSERT_MSG(m_impl->socket, "Invalid socket");
auto size = zmq_msg_recv(message.handle(), m_impl->socket, static_cast<int>(flags));
if (size < 0) error = makeErrorCode();
return size;
}
template <typename Service>
std::size_t BasicSocket<Service>::receive(Message &message, RecvFlags flags) {
boost::system::error_code error;
auto size = receive(message, flags, error);
if (error) throw error;
return size;
}
template <typename Service>
size_t BasicSocket<Service>::receive(boost::asio::mutable_buffer buffer, RecvFlags flags,
boost::system::error_code &error) {
const int nbytes = zmq_recv(m_impl->socket, buffer.data(), buffer.size(), static_cast<int>(flags));
if (nbytes >= 0) return nbytes;
error = makeErrorCode();
return nbytes;
}
template <typename Service>
template <typename MutableBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
BasicSocket<Service>::receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error) {
std::vector<size_t> ret;
auto iterator = std::begin(buffers);
auto f = static_cast<int>(flags);
do {
auto size = receive(*iterator, flags, error);
if (error) return ret;
ret.push_back(size);
f |= ZMQ_RCVMORE;
++iterator;
} while ((iterator != std::end(buffers)) && option(ReceiveMore));
if (option(ReceiveMore)) error = makeErrorCode(boost::system::errc::no_buffer_space);
return ret;
}
template <typename Service>
template <typename MutableBufferSequence>
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
BasicSocket<Service>::receive(const MutableBufferSequence &buffers, RecvFlags flags) {
boost::system::error_code error;
auto size = receive(buffers, flags, error);
if (error) throw error;
return size;
}
template <typename Service>
size_t BasicSocket<Service>::receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags) {
boost::system::error_code error;
auto size = receive(std::move(buffer), flags, error);
if (error) throw error;
return size;
}
template <typename Service>
template <typename MutableBufferSequence, typename ReadHandler>
void BasicSocket<Service>::asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler) {
// using namespace boost::asio::posix;
if (option(Events) & ZMQ_POLLIN) {
boost::asio::post(m_service.get_io_context(), [&buffers, this, handler{std::move(handler)}]() {
boost::system::error_code error;
auto size = receive(buffers, RecvFlags::Dontwait, error);
handler(error, size);
});
return;
}
m_impl->descriptor->async_wait(StreamType::wait_read, [this, &buffers, handler{std::move(handler)}](
const boost::system::error_code &waitError) {
if (waitError) {
handler(waitError, {});
return;
}
if (option(Events) & ZMQ_POLLIN) {
boost::system::error_code error;
auto size = receive(buffers, RecvFlags::Dontwait, error);
return handler(error, size);
} else {
asyncReceive(buffers, handler);
}
});
}
template <typename Service>
template <typename ReadHandler>
void BasicSocket<Service>::asyncReceive(Message &message, ReadHandler &&handler) {
// using namespace boost::asio::posix;
if (option(Events) & ZMQ_POLLIN) {
boost::asio::post(m_service.get_io_context(), [&message, this, handler{std::move(handler)}]() {
boost::system::error_code error;
auto size = receive(message, RecvFlags::Dontwait, error);
handler(error, size);
});
return;
}
m_impl->descriptor->async_wait(StreamType::wait_read, [this, &message, handler{std::move(handler)}](
const boost::system::error_code &waitError) {
if (waitError) {
handler(waitError, 0);
return;
}
if (option(Events) & ZMQ_POLLIN) {
boost::system::error_code error;
auto size = receive(message, RecvFlags::Dontwait, error);
return handler(error, size);
} else {
asyncReceive(message, handler);
}
});
}
template <typename Service>
template <bool CheckN, class OutputIt>
size_t BasicSocket<Service>::receiveMultipart(OutputIt &out, size_t n, RecvFlags flags,
boost::system::error_code &error) {
size_t msg_count = 0;
Message message;
while (true) {
if (CheckN) {
if (msg_count >= n) throw std::runtime_error("Too many message parts in recv_multipart_n");
}
receive(message, flags, error);
if (error) return msg_count;
++msg_count;
const bool more = message.more();
*out++ = std::move(message);
if (!more) break;
}
return msg_count;
}
template <typename Service>
template <class OutputIt, typename ReadHandler>
void BasicSocket<Service>::asyncReceiveMultipart(OutputIt out, ReadHandler &&handler) {
// using namespace boost::asio::posix;
boost::system::error_code error;
if (option(Events) & ZMQ_POLLIN) {
auto size = receiveMultipart<false>(out, 0, RecvFlags::Dontwait, error);
return handler(error, size);
}
m_impl->descriptor->async_wait(StreamType::wait_read, [this, out, handler{std::move(handler)}](
const boost::system::error_code &waitError) mutable {
if (waitError) {
handler(waitError, 0);
return;
}
if ((option(Events) & ZMQ_POLLIN) == 0) {
asyncReceiveMultipart(out, handler);
return;
}
size_t size = 0;
boost::system::error_code error;
size += receiveMultipart<false>(out, 0, RecvFlags::Dontwait, error);
if (!error || error.value() != EAGAIN) return handler(error, size);
});
}
} // namespace ZeroMQ
#endif // BASICSOCKET_INL

View File

@ -1,29 +0,0 @@
add_library(AsioZeroMQ
BasicSocket.h BasicSocket.inl
ErrorCode.h ErrorCode.cpp
Message.h Message.cpp
Options.h
SocketService.h SocketService.cpp
ZeroMQSocket.h
)
target_include_directories(AsioZeroMQ
INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}
PUBLIC ${ZeroMQ_INCLUDE_DIR}
)
target_link_directories(AsioZeroMQ
PUBLIC ${ZeroMQ_LIBRARY_DIRS}
)
target_link_libraries(AsioZeroMQ
PUBLIC ${Boost_LIBRARIES}
PUBLIC Universal
PUBLIC ${ZeroMQ_LIBRARIES}
)
if(UNIX)
target_compile_options(AsioZeroMQ
PRIVATE -fPIC
)
endif()

View File

@ -1,13 +0,0 @@
#include "ErrorCode.h"
namespace ZeroMQ {
boost::system::error_code makeErrorCode(int ev) {
static ErrorCategory cat;
return boost::system::error_code(ev, cat);
}
const char *ErrorCategory::name() const noexcept { return "ZeroMQ"; }
std::string ErrorCategory::message(int ev) const { return std::string(zmq_strerror(ev)); }
} // namespace ZeroMQ

View File

@ -1,18 +0,0 @@
#ifndef ERRORCODE_H
#define ERRORCODE_H
#include <boost/system/error_code.hpp>
#include <zmq.h>
namespace ZeroMQ {
class ErrorCategory : public boost::system::error_category {
public:
const char *name() const noexcept override;
std::string message(int ev) const override;
};
boost::system::error_code makeErrorCode(int ev = zmq_errno());
} // namespace ZeroMQ
#endif // ERRORCODE_H

View File

@ -1,85 +0,0 @@
#include "Message.h"
#include <boost/assert.hpp>
#include <iomanip>
#include <sstream>
namespace ZeroMQ {
Message::Message() noexcept { zmq_msg_init(&m_message); }
Message::~Message() noexcept {
int rc = zmq_msg_close(&m_message);
BOOST_ASSERT_MSG(rc == 0, "init failed.");
}
Message::Message(Message &&rhs) noexcept : m_message(rhs.m_message) {
int rc = zmq_msg_init(&rhs.m_message);
BOOST_ASSERT(rc == 0);
}
Message &Message::operator=(Message &&rhs) noexcept {
std::swap(m_message, rhs.m_message);
return *this;
}
bool Message::operator==(const Message &other) const noexcept {
const size_t my_size = size();
return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size);
}
bool Message::operator!=(const Message &other) const noexcept { return !(*this == other); }
Message::Message(const void *dataSrc, size_t size) {
int rc = zmq_msg_init_size(&m_message, size);
if (rc != 0) throw makeErrorCode();
if (size) {
// this constructor allows (nullptr, 0), memcpy with a null pointer is UB
memcpy(data(), dataSrc, size);
}
}
void Message::copy(Message &message) {
int rc = zmq_msg_copy(&m_message, message.handle());
if (rc != 0) throw makeErrorCode();
}
std::string Message::dump() const {
// Partly mutuated from the same method in zmq::multipart_t
std::ostringstream os;
const unsigned char *msg_data = this->data<unsigned char>();
unsigned char byte;
size_t size = this->size();
int is_ascii[2] = {0, 0};
os << "Message [size " << std::dec << std::setw(3) << std::setfill('0') << size << "] (";
// Totally arbitrary
if (size >= 1000) {
os << "... too big to print)";
} else {
while (size--) {
byte = *msg_data++;
is_ascii[1] = (byte >= 32 && byte < 127);
if (is_ascii[1] != is_ascii[0]) os << " "; // Separate text/non text
if (is_ascii[1]) {
os << byte;
} else {
os << std::hex << std::uppercase << std::setw(2) << std::setfill('0') << static_cast<short>(byte);
}
is_ascii[0] = is_ascii[1];
}
os << ")";
}
return os.str();
}
} // namespace ZeroMQ
namespace std {
ostream &operator<<(ostream &stream, const ZeroMQ::Message &message) {
stream << message.dump();
return stream;
}
} // namespace std

View File

@ -1,84 +0,0 @@
#ifndef MESSAGE_H
#define MESSAGE_H
#include "ErrorCode.h"
#include <string_view>
#include <zmq.h>
namespace ZeroMQ {
class Message {
public:
Message() noexcept;
~Message() noexcept;
Message(Message &&rhs) noexcept;
Message &operator=(Message &&rhs) noexcept;
bool operator==(const Message &other) const noexcept;
bool operator!=(const Message &other) const noexcept;
Message(size_t size) {
int rc = zmq_msg_init_size(&m_message, size);
if (rc != 0) throw makeErrorCode();
}
Message(const void *dataSrc, size_t size);
Message(std::string_view string) : Message(string.data(), string.size()) {
}
void *data() noexcept {
return zmq_msg_data(&m_message);
}
const void *data() const noexcept {
return zmq_msg_data(const_cast<zmq_msg_t *>(&m_message));
}
template <typename T = void *>
T const *data() const noexcept {
return static_cast<T const *>(data());
}
size_t size() const noexcept {
return zmq_msg_size(const_cast<zmq_msg_t *>(&m_message));
}
zmq_msg_t *handle() noexcept {
return &m_message;
}
const zmq_msg_t *handle() const noexcept {
return &m_message;
}
bool more() const noexcept {
int rc = zmq_msg_more(const_cast<zmq_msg_t *>(&m_message));
return rc != 0;
}
/**
* @brief copy from message
*/
void copy(Message &message);
/**
* @brief zeromq的字符串不是以\0,std::string_view::data()
*
* @return std::string_view
*/
std::string_view toStringView() const noexcept {
return std::string_view(static_cast<const char *>(data()), size());
}
std::string dump() const;
protected:
Message(const Message &) = delete;
void operator=(const Message &) = delete;
private:
// The underlying message
zmq_msg_t m_message;
};
} // namespace ZeroMQ
namespace std {
ostream &operator<<(ostream &stream, const ZeroMQ::Message &message);
}
#endif // MESSAGE_H

View File

@ -1,65 +0,0 @@
#ifndef OPTIONS_H
#define OPTIONS_H
#include <zmq.h>
namespace ZeroMQ {
enum class SocketType : int {
Req = ZMQ_REQ,
Rep = ZMQ_REP,
Dealer = ZMQ_DEALER,
Router = ZMQ_ROUTER,
Pub = ZMQ_PUB,
Sub = ZMQ_SUB,
Xpub = ZMQ_XPUB,
Xsub = ZMQ_XSUB,
Push = ZMQ_PUSH,
Pull = ZMQ_PULL,
Stream = ZMQ_STREAM,
Pair = ZMQ_PAIR
};
enum class SendFlags : int {
None = 0,
Dontwait = ZMQ_DONTWAIT,
Sndmore = ZMQ_SNDMORE,
};
enum class RecvFlags : int {
None = 0,
Dontwait = ZMQ_DONTWAIT,
};
// BoolUnit: if true accepts values of type bool (but passed as T into libzmq)
template <int Option, class T, bool BoolUnit = false>
struct IntegralOption {};
// NullTerm:
// 0: binary data
// 1: null-terminated string (`getsockopt` size includes null)
// 2: binary (size 32) or Z85 encoder string of size 41 (null included)
template <int Option, int NullTerm = 1>
struct ArrayOption {};
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_LINGER, linger, int);
using LingerType = IntegralOption<ZMQ_LINGER, int, false>;
inline constexpr LingerType Linger;
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVHWM, rcvhwm, int);
using ReceiveHighWaterMarkType = IntegralOption<ZMQ_RCVHWM, int, false>;
inline constexpr ReceiveHighWaterMarkType ReceiveHighWaterMark;
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_EVENTS, events, int);
using EventsType = IntegralOption<ZMQ_EVENTS, int, false>;
inline constexpr EventsType Events;
// ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_RCVMORE, rcvmore, int);
using ReceiveMoreType = IntegralOption<ZMQ_RCVMORE, int, true>;
inline constexpr ReceiveMoreType ReceiveMore;
// ZMQ_DEFINE_ARRAY_OPT(ZMQ_SUBSCRIBE, subscribe);
using SubscribeType = ArrayOption<ZMQ_SUBSCRIBE>;
inline constexpr SubscribeType Subscribe;
} // namespace ZeroMQ
#endif // OPTIONS_H

View File

@ -1,62 +0,0 @@
#include "SocketService.h"
#include "ErrorCode.h"
#include <boost/assert.hpp>
#include <zmq.h>
#include "BoostLog.h"
namespace ZeroMQ {
SocketService::SocketService(boost::asio::io_context &context) : boost::asio::io_context::service(context) {
m_context = zmq_ctx_new();
}
SocketService::~SocketService() { zmq_ctx_term(m_context); }
void SocketService::construct(ImplementationType &impl, SocketType type) {
impl = std::make_shared<Implementation>(m_context, type, get_io_context());
}
void SocketService::destroy(ImplementationType &impl) { impl.reset(); }
void SocketService::connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error) {
std::lock_guard lock_guard(impl->mutex);
BOOST_ASSERT_MSG(impl->socket, "invalid socket");
auto status = zmq_connect(impl->socket, endpoint.data());
if (status < 0) error = makeErrorCode();
}
void SocketService::setOption(SocketService::ImplementationType &impl, int option, const void *optval, size_t optvallen,
boost::system::error_code &error) {
int status = zmq_setsockopt(impl->socket, option, optval, optvallen);
if (status < 0) error = makeErrorCode();
}
void SocketService::option(const ImplementationType &impl, int option, void *optval, size_t *optvallen,
boost::system::error_code &error) {
int rc = zmq_getsockopt(impl->socket, option, optval, optvallen);
if (rc != 0) error = makeErrorCode();
}
SocketService::Implementation::Implementation(void *context, SocketType type, boost::asio::io_context &ioContext) {
socket = zmq_socket(context, static_cast<int>(type));
NativeHandleType handle = 0;
auto size = sizeof(handle);
auto rc = zmq_getsockopt(socket, ZMQ_FD, &handle, &size);
if (rc < 0) {
throw makeErrorCode();
}
#if !defined BOOST_ASIO_WINDOWS
descriptor.reset(new StreamType(ioContext, handle));
#else
// Use duplicated SOCKET, because ASIO socket takes ownership over it so destroys one in dtor.
::WSAPROTOCOL_INFOW pi;
::WSADuplicateSocketW(handle, ::GetCurrentProcessId(), &pi);
handle = ::WSASocketW(pi.iAddressFamily /*AF_INET*/, pi.iSocketType /*SOCK_STREAM*/, pi.iProtocol /*IPPROTO_TCP*/,
&pi, 0, 0);
descriptor.reset(new boost::asio::ip::tcp::socket(ioContext, boost::asio::ip::tcp::v4(), handle));
#endif
}
SocketService::Implementation::~Implementation() { zmq_close(socket); }
} // namespace ZeroMQ

View File

@ -1,63 +0,0 @@
#ifndef SOCKETSERVICE_H
#define SOCKETSERVICE_H
#include "Options.h"
#include <boost/asio/execution_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <mutex>
namespace ZeroMQ {
#if !defined BOOST_ASIO_WINDOWS
struct StreamDescriptorClose {
void operator()(boost::asio::posix::stream_descriptor *descriptor) {
descriptor->release();
delete descriptor;
}
};
using StreamDescriptor = std::unique_ptr<boost::asio::posix::stream_descriptor, StreamDescriptorClose>;
using StreamType = boost::asio::posix::stream_descriptor;
using NativeHandleType = boost::asio::posix::stream_descriptor::native_handle_type;
#else
using StreamDescriptor = std::unique_ptr<boost::asio::ip::tcp::socket>;
using StreamType = boost::asio::ip::tcp::socket;
using NativeHandleType = boost::asio::ip::tcp::socket::native_handle_type;
#endif
class SocketService : public boost::asio::io_context::service {
public:
inline static boost::asio::execution_context::id id;
SocketService(boost::asio::io_context &context);
~SocketService();
class Implementation {
public:
Implementation(void *context, SocketType type, boost::asio::io_context &ioContext);
~Implementation();
void *socket;
StreamDescriptor descriptor;
std::mutex mutex;
};
using ImplementationType = std::shared_ptr<Implementation>;
void construct(ImplementationType &impl, SocketType type);
void destroy(ImplementationType &impl);
void connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error);
void setOption(ImplementationType &impl, int option, const void *optval, size_t optvallen,
boost::system::error_code &error);
void option(const ImplementationType &impl, int option, void *optval, size_t *optvallen,
boost::system::error_code &error);
/// Destroy all user-defined handler objects owned by the service.
void shutdown() {}
private:
void *m_context;
};
} // namespace ZeroMQ
#endif // SOCKETSERVICE_H

View File

@ -1,10 +0,0 @@
#ifndef SOCKET_H
#define SOCKET_H
#include "BasicSocket.h"
#include "SocketService.h"
namespace ZeroMQ {
using Socket = BasicSocket<SocketService>;
}
#endif // SOCKET_H

View File

@ -4,12 +4,17 @@ project(Kylin)
set(OpenSSL_LIBRARY ssl crypto) set(OpenSSL_LIBRARY ssl crypto)
set(ZeroMQ_INCLUDE_DIR ${ZeroMQ_ROOT}/include)
set(ZeroMQ_LIBRARY_DIRS ${ZeroMQ_ROOT}/lib)
set(ZeroMQ_LIBRARIES zmq)
add_subdirectory(AsioZeroMQ) if(TARGET Boost::serialization)
add_subdirectory(Encrypt) add_subdirectory(Encrypt)
endif()
if(TARGET Boost::url)
add_subdirectory(HttpProxy) add_subdirectory(HttpProxy)
endif()
add_subdirectory(QtComponets) add_subdirectory(QtComponets)
if(TARGET Boost::log)
add_subdirectory(Universal) add_subdirectory(Universal)
endif()

View File

@ -1,4 +1,5 @@
find_package(Qt6 COMPONENTS Gui REQUIRED) find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Gui)
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Gui)
set(CMAKE_AUTOMOC ON) set(CMAKE_AUTOMOC ON)
@ -12,5 +13,5 @@ target_include_directories(QtComponets
) )
target_link_libraries(QtComponets target_link_libraries(QtComponets
PUBLIC Qt6::Gui PUBLIC Qt${QT_VERSION_MAJOR}::Gui
) )