add library 'AsioZeroMQ'.
This commit is contained in:
parent
64c387aba9
commit
4c1db38c8e
94
AsioZeroMQ/BasicSocket.h
Normal file
94
AsioZeroMQ/BasicSocket.h
Normal file
@ -0,0 +1,94 @@
|
||||
#ifndef BASICSOCKET_H
|
||||
#define BASICSOCKET_H
|
||||
|
||||
#include "Message.h"
|
||||
#include "Options.h"
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/execution_context.hpp>
|
||||
#include <boost/range/metafunctions.hpp>
|
||||
#include <zmq.h>
|
||||
|
||||
namespace ZeroMQ {
|
||||
|
||||
template <typename Service>
|
||||
class BasicSocket {
|
||||
public:
|
||||
using ImplementationType = typename Service::ImplementationType;
|
||||
BasicSocket(boost::asio::io_context &context, SocketType type);
|
||||
void connect(std::string_view address, boost::system::error_code &error);
|
||||
void connect(std::string_view address);
|
||||
|
||||
/**
|
||||
* @brief only see the instance is nullptr or not.
|
||||
*/
|
||||
bool connected() const;
|
||||
|
||||
void bind(std::string_view address, boost::system::error_code &error);
|
||||
void bind(std::string_view address);
|
||||
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
void setOption(IntegralOption<Option, T, BoolUnit>, const T &value, boost::system::error_code &error);
|
||||
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
void setOption(IntegralOption<Option, T, BoolUnit>, const T &value);
|
||||
|
||||
template <int Option, int NullTerm>
|
||||
void setOption(ArrayOption<Option, NullTerm>, const std::string_view &buffer, boost::system::error_code &error);
|
||||
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
T option(IntegralOption<Option, T, BoolUnit>, boost::system::error_code &error) const;
|
||||
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
T option(IntegralOption<Option, T, BoolUnit>) const;
|
||||
|
||||
size_t send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error);
|
||||
|
||||
template <typename ConstBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
|
||||
send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error);
|
||||
|
||||
template <typename ConstBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
|
||||
send(const ConstBufferSequence &buffers, SendFlags flags = SendFlags::None);
|
||||
|
||||
size_t send(Message &&message, SendFlags flags, boost::system::error_code &error);
|
||||
size_t send(Message &&message, SendFlags flags);
|
||||
|
||||
std::size_t receive(Message &message, RecvFlags flags, boost::system::error_code &error);
|
||||
std::size_t receive(Message &message, RecvFlags flags = RecvFlags::None);
|
||||
|
||||
size_t receive(boost::asio::mutable_buffer buffer, RecvFlags flags, boost::system::error_code &error);
|
||||
size_t receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags = RecvFlags::None);
|
||||
|
||||
template <typename MutableBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
|
||||
receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error);
|
||||
|
||||
template <typename MutableBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
|
||||
receive(const MutableBufferSequence &buffers, RecvFlags flags = RecvFlags::None);
|
||||
|
||||
// void read_handler(const boost::system::error_code& ec,std::size_t bytes_transferred)
|
||||
template <typename ReadHandler>
|
||||
void asyncReceive(Message &message, ReadHandler &&handler);
|
||||
|
||||
template <typename MutableBufferSequence, typename ReadHandler>
|
||||
void asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler);
|
||||
|
||||
template <bool CheckN, class OutputIt>
|
||||
size_t receiveMultipart(OutputIt &out, size_t n, RecvFlags flags, boost::system::error_code &error);
|
||||
|
||||
template <class OutputIt, typename ReadHandler>
|
||||
void asyncReceiveMultipart(OutputIt out, ReadHandler &&handler);
|
||||
|
||||
boost::asio::io_context &ioContext() const;
|
||||
~BasicSocket();
|
||||
|
||||
private:
|
||||
Service &m_service;
|
||||
ImplementationType m_impl;
|
||||
};
|
||||
|
||||
} // namespace ZeroMQ
|
||||
#include "BasicSocket.inl"
|
||||
#endif // BASICSOCKET_H
|
331
AsioZeroMQ/BasicSocket.inl
Normal file
331
AsioZeroMQ/BasicSocket.inl
Normal file
@ -0,0 +1,331 @@
|
||||
#ifndef BASICSOCKET_INL
|
||||
#define BASICSOCKET_INL
|
||||
|
||||
#include "BasicSocket.h"
|
||||
#include "ErrorCode.h"
|
||||
#include "SocketService.h"
|
||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <zmq.h>
|
||||
|
||||
#include "BoostLog.h"
|
||||
|
||||
namespace ZeroMQ {
|
||||
|
||||
template <typename Service>
|
||||
BasicSocket<Service>::BasicSocket(boost::asio::io_context &context, SocketType type)
|
||||
: m_service(boost::asio::use_service<Service>(context)) {
|
||||
m_service.construct(m_impl, type);
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
BasicSocket<Service>::~BasicSocket() {
|
||||
m_service.destroy(m_impl);
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
void BasicSocket<Service>::connect(std::string_view address, boost::system::error_code &error) {
|
||||
m_service.connect(m_impl, std::move(address), error);
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
void BasicSocket<Service>::connect(std::string_view address) {
|
||||
boost::system::error_code error;
|
||||
connect(std::move(address), error);
|
||||
if (error) throw error;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
bool BasicSocket<Service>::connected() const {
|
||||
return m_impl != nullptr && m_impl->socket != nullptr;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
void BasicSocket<Service>::bind(std::string_view address, boost::system::error_code &error) {
|
||||
std::lock_guard lock_guard(m_impl->mutex);
|
||||
auto status = zmq_bind(m_impl->socket, address.data());
|
||||
if (status < 0) {
|
||||
error = makeErrorCode();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
void BasicSocket<Service>::bind(std::string_view address) {
|
||||
boost::system::error_code error;
|
||||
bind(std::move(address), error);
|
||||
if (error) throw error;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
void BasicSocket<Service>::setOption(IntegralOption<Option, T, BoolUnit>, const T &value,
|
||||
boost::system::error_code &error) {
|
||||
static_assert(std::is_integral<T>::value, "T must be integral");
|
||||
m_service.setOption(m_impl, Option, &value, sizeof(value), error);
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
void BasicSocket<Service>::setOption(IntegralOption<Option, T, BoolUnit>, const T &value) {
|
||||
boost::system::error_code error;
|
||||
setOption(IntegralOption<Option, T, BoolUnit>(), value, error);
|
||||
if (error) throw error;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <int Option, int NullTerm>
|
||||
void BasicSocket<Service>::setOption(ArrayOption<Option, NullTerm>, const std::string_view &buffer,
|
||||
boost::system::error_code &error) {
|
||||
m_service.setOption(m_impl, Option, buffer.data(), buffer.size(), error);
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
T BasicSocket<Service>::option(IntegralOption<Option, T, BoolUnit>, boost::system::error_code &error) const {
|
||||
static_assert(std::is_integral<T>::value, "T must be integral");
|
||||
T value;
|
||||
size_t size = sizeof value;
|
||||
m_service.option(m_impl, Option, &value, &size, error);
|
||||
|
||||
assert(size == sizeof value);
|
||||
return value;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <int Option, class T, bool BoolUnit>
|
||||
T BasicSocket<Service>::option(IntegralOption<Option, T, BoolUnit>) const {
|
||||
boost::system::error_code error;
|
||||
auto ret = option(IntegralOption<Option, T, BoolUnit>(), error);
|
||||
if (error) throw error;
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
boost::asio::io_context &BasicSocket<Service>::ioContext() const {
|
||||
return m_service.get_io_context();
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
size_t BasicSocket<Service>::send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error) {
|
||||
const int nbytes = zmq_send(m_impl->socket, buffer.data(), buffer.size(), static_cast<int>(flags));
|
||||
if (nbytes >= 0) return static_cast<size_t>(nbytes);
|
||||
error = makeErrorCode();
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename ConstBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
|
||||
BasicSocket<Service>::send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error) {
|
||||
size_t res = 0;
|
||||
auto last = std::distance(std::begin(buffers), std::end(buffers)) - 1;
|
||||
auto index = 0u;
|
||||
for (auto it = std::begin(buffers); it != std::end(buffers); ++it, ++index) {
|
||||
auto f = index == last ? static_cast<int>(flags) : static_cast<int>(flags) | ZMQ_SNDMORE;
|
||||
res += send(*it, static_cast<SendFlags>(f), error);
|
||||
if (error) return 0u;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename ConstBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<ConstBufferSequence>, size_t>::type
|
||||
BasicSocket<Service>::send(const ConstBufferSequence &buffers, SendFlags flags) {
|
||||
boost::system::error_code error;
|
||||
auto size = send(buffers, flags, error);
|
||||
if (error) throw error;
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
size_t BasicSocket<Service>::send(Message &&message, SendFlags flags, boost::system::error_code &error) {
|
||||
int nbytes = zmq_msg_send(message.handle(), m_impl->socket, static_cast<int>(flags));
|
||||
if (nbytes >= 0) return static_cast<size_t>(nbytes);
|
||||
error = makeErrorCode();
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
size_t BasicSocket<Service>::send(Message &&message, SendFlags flags) {
|
||||
boost::system::error_code error;
|
||||
auto size = send(std::move(message), flags, error);
|
||||
if (error) throw error;
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
std::size_t BasicSocket<Service>::receive(Message &message, RecvFlags flags, boost::system::error_code &error) {
|
||||
std::lock_guard lock_guard(m_impl->mutex);
|
||||
BOOST_ASSERT_MSG(m_impl->socket, "Invalid socket");
|
||||
auto size = zmq_msg_recv(message.handle(), m_impl->socket, static_cast<int>(flags));
|
||||
if (size < 0) error = makeErrorCode();
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
std::size_t BasicSocket<Service>::receive(Message &message, RecvFlags flags) {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(message, flags, error);
|
||||
if (error) throw error;
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
size_t BasicSocket<Service>::receive(boost::asio::mutable_buffer buffer, RecvFlags flags,
|
||||
boost::system::error_code &error) {
|
||||
const int nbytes = zmq_recv(m_impl->socket, buffer.data(), buffer.size(), static_cast<int>(flags));
|
||||
if (nbytes >= 0) return nbytes;
|
||||
|
||||
error = makeErrorCode();
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename MutableBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
|
||||
BasicSocket<Service>::receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error) {
|
||||
std::vector<size_t> ret;
|
||||
auto iterator = std::begin(buffers);
|
||||
auto f = static_cast<int>(flags);
|
||||
do {
|
||||
auto size = receive(*iterator, flags, error);
|
||||
if (error) return ret;
|
||||
ret.push_back(size);
|
||||
|
||||
f |= ZMQ_RCVMORE;
|
||||
++iterator;
|
||||
} while ((iterator != std::end(buffers)) && option(ReceiveMore));
|
||||
if (option(ReceiveMore)) error = makeErrorCode(boost::system::errc::no_buffer_space);
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename MutableBufferSequence>
|
||||
typename boost::enable_if<boost::has_range_const_iterator<MutableBufferSequence>, std::vector<size_t>>::type
|
||||
BasicSocket<Service>::receive(const MutableBufferSequence &buffers, RecvFlags flags) {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(buffers, flags, error);
|
||||
if (error) throw error;
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
size_t BasicSocket<Service>::receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags) {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(std::move(buffer), flags, error);
|
||||
if (error) throw error;
|
||||
return size;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename MutableBufferSequence, typename ReadHandler>
|
||||
void BasicSocket<Service>::asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler) {
|
||||
// using namespace boost::asio::posix;
|
||||
if (option(Events) & ZMQ_POLLIN) {
|
||||
boost::asio::post(m_service.get_io_context(), [&buffers, this, handler{std::move(handler)}]() {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(buffers, RecvFlags::Dontwait, error);
|
||||
handler(error, size);
|
||||
});
|
||||
return;
|
||||
}
|
||||
m_impl->descriptor->async_wait(StreamType::wait_read, [this, &buffers, handler{std::move(handler)}](
|
||||
const boost::system::error_code &waitError) {
|
||||
if (waitError) {
|
||||
handler(waitError, {});
|
||||
return;
|
||||
}
|
||||
if (option(Events) & ZMQ_POLLIN) {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(buffers, RecvFlags::Dontwait, error);
|
||||
return handler(error, size);
|
||||
} else {
|
||||
asyncReceive(buffers, handler);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <typename ReadHandler>
|
||||
void BasicSocket<Service>::asyncReceive(Message &message, ReadHandler &&handler) {
|
||||
// using namespace boost::asio::posix;
|
||||
|
||||
if (option(Events) & ZMQ_POLLIN) {
|
||||
boost::asio::post(m_service.get_io_context(), [&message, this, handler{std::move(handler)}]() {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(message, RecvFlags::Dontwait, error);
|
||||
handler(error, size);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
m_impl->descriptor->async_wait(StreamType::wait_read, [this, &message, handler{std::move(handler)}](
|
||||
const boost::system::error_code &waitError) {
|
||||
if (waitError) {
|
||||
handler(waitError, 0);
|
||||
return;
|
||||
}
|
||||
if (option(Events) & ZMQ_POLLIN) {
|
||||
boost::system::error_code error;
|
||||
auto size = receive(message, RecvFlags::Dontwait, error);
|
||||
return handler(error, size);
|
||||
} else {
|
||||
asyncReceive(message, handler);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <bool CheckN, class OutputIt>
|
||||
size_t BasicSocket<Service>::receiveMultipart(OutputIt &out, size_t n, RecvFlags flags,
|
||||
boost::system::error_code &error) {
|
||||
size_t msg_count = 0;
|
||||
Message message;
|
||||
while (true) {
|
||||
if (CheckN) {
|
||||
if (msg_count >= n) throw std::runtime_error("Too many message parts in recv_multipart_n");
|
||||
}
|
||||
receive(message, flags, error);
|
||||
if (error) return msg_count;
|
||||
|
||||
++msg_count;
|
||||
const bool more = message.more();
|
||||
*out++ = std::move(message);
|
||||
if (!more) break;
|
||||
}
|
||||
|
||||
return msg_count;
|
||||
}
|
||||
|
||||
template <typename Service>
|
||||
template <class OutputIt, typename ReadHandler>
|
||||
void BasicSocket<Service>::asyncReceiveMultipart(OutputIt out, ReadHandler &&handler) {
|
||||
// using namespace boost::asio::posix;
|
||||
boost::system::error_code error;
|
||||
|
||||
if (option(Events) & ZMQ_POLLIN) {
|
||||
auto size = receiveMultipart<false>(out, 0, RecvFlags::Dontwait, error);
|
||||
return handler(error, size);
|
||||
}
|
||||
m_impl->descriptor->async_wait(StreamType::wait_read, [this, out, handler{std::move(handler)}](
|
||||
const boost::system::error_code &waitError) mutable {
|
||||
if (waitError) {
|
||||
handler(waitError, 0);
|
||||
return;
|
||||
}
|
||||
if ((option(Events) & ZMQ_POLLIN) == 0) {
|
||||
asyncReceiveMultipart(out, handler);
|
||||
return;
|
||||
}
|
||||
size_t size = 0;
|
||||
boost::system::error_code error;
|
||||
size += receiveMultipart<false>(out, 0, RecvFlags::Dontwait, error);
|
||||
if (!error || error.value() != EAGAIN) return handler(error, size);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace ZeroMQ
|
||||
|
||||
#endif // BASICSOCKET_INL
|
29
AsioZeroMQ/CMakeLists.txt
Normal file
29
AsioZeroMQ/CMakeLists.txt
Normal file
@ -0,0 +1,29 @@
|
||||
add_library(AsioZeroMQ
|
||||
BasicSocket.h BasicSocket.inl
|
||||
ErrorCode.h ErrorCode.cpp
|
||||
Message.h Message.cpp
|
||||
Options.h
|
||||
SocketService.h SocketService.cpp
|
||||
ZeroMQSocket.h
|
||||
)
|
||||
|
||||
target_include_directories(AsioZeroMQ
|
||||
INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}
|
||||
PUBLIC ${ZeroMQ_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
target_link_directories(AsioZeroMQ
|
||||
PUBLIC ${ZeroMQ_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
target_link_libraries(AsioZeroMQ
|
||||
PUBLIC ${Boost_LIBRARIES}
|
||||
PUBLIC Universal
|
||||
PUBLIC ${ZeroMQ_LIBRARIES}
|
||||
)
|
||||
|
||||
if(UNIX)
|
||||
target_compile_options(AsioZeroMQ
|
||||
PRIVATE -fPIC
|
||||
)
|
||||
endif()
|
13
AsioZeroMQ/ErrorCode.cpp
Normal file
13
AsioZeroMQ/ErrorCode.cpp
Normal file
@ -0,0 +1,13 @@
|
||||
#include "ErrorCode.h"
|
||||
|
||||
namespace ZeroMQ {
|
||||
boost::system::error_code makeErrorCode(int ev) {
|
||||
static ErrorCategory cat;
|
||||
|
||||
return boost::system::error_code(ev, cat);
|
||||
}
|
||||
|
||||
const char *ErrorCategory::name() const noexcept { return "ZeroMQ"; }
|
||||
|
||||
std::string ErrorCategory::message(int ev) const { return std::string(zmq_strerror(ev)); }
|
||||
} // namespace ZeroMQ
|
18
AsioZeroMQ/ErrorCode.h
Normal file
18
AsioZeroMQ/ErrorCode.h
Normal file
@ -0,0 +1,18 @@
|
||||
#ifndef ERRORCODE_H
|
||||
#define ERRORCODE_H
|
||||
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <zmq.h>
|
||||
|
||||
namespace ZeroMQ {
|
||||
|
||||
class ErrorCategory : public boost::system::error_category {
|
||||
public:
|
||||
const char *name() const noexcept override;
|
||||
std::string message(int ev) const override;
|
||||
};
|
||||
|
||||
boost::system::error_code makeErrorCode(int ev = zmq_errno());
|
||||
} // namespace ZeroMQ
|
||||
|
||||
#endif // ERRORCODE_H
|
85
AsioZeroMQ/Message.cpp
Normal file
85
AsioZeroMQ/Message.cpp
Normal file
@ -0,0 +1,85 @@
|
||||
#include "Message.h"
|
||||
#include <boost/assert.hpp>
|
||||
#include <iomanip>
|
||||
#include <sstream>
|
||||
|
||||
namespace ZeroMQ {
|
||||
Message::Message() noexcept { zmq_msg_init(&m_message); }
|
||||
|
||||
Message::~Message() noexcept {
|
||||
int rc = zmq_msg_close(&m_message);
|
||||
BOOST_ASSERT_MSG(rc == 0, "init failed.");
|
||||
}
|
||||
|
||||
Message::Message(Message &&rhs) noexcept : m_message(rhs.m_message) {
|
||||
int rc = zmq_msg_init(&rhs.m_message);
|
||||
BOOST_ASSERT(rc == 0);
|
||||
}
|
||||
|
||||
Message &Message::operator=(Message &&rhs) noexcept {
|
||||
std::swap(m_message, rhs.m_message);
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool Message::operator==(const Message &other) const noexcept {
|
||||
const size_t my_size = size();
|
||||
return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size);
|
||||
}
|
||||
|
||||
bool Message::operator!=(const Message &other) const noexcept { return !(*this == other); }
|
||||
|
||||
Message::Message(const void *dataSrc, size_t size) {
|
||||
int rc = zmq_msg_init_size(&m_message, size);
|
||||
if (rc != 0) throw makeErrorCode();
|
||||
if (size) {
|
||||
// this constructor allows (nullptr, 0), memcpy with a null pointer is UB
|
||||
memcpy(data(), dataSrc, size);
|
||||
}
|
||||
}
|
||||
|
||||
void Message::copy(Message &message) {
|
||||
int rc = zmq_msg_copy(&m_message, message.handle());
|
||||
if (rc != 0) throw makeErrorCode();
|
||||
}
|
||||
|
||||
std::string Message::dump() const {
|
||||
|
||||
// Partly mutuated from the same method in zmq::multipart_t
|
||||
std::ostringstream os;
|
||||
|
||||
const unsigned char *msg_data = this->data<unsigned char>();
|
||||
unsigned char byte;
|
||||
size_t size = this->size();
|
||||
int is_ascii[2] = {0, 0};
|
||||
|
||||
os << "Message [size " << std::dec << std::setw(3) << std::setfill('0') << size << "] (";
|
||||
// Totally arbitrary
|
||||
if (size >= 1000) {
|
||||
os << "... too big to print)";
|
||||
} else {
|
||||
while (size--) {
|
||||
byte = *msg_data++;
|
||||
|
||||
is_ascii[1] = (byte >= 32 && byte < 127);
|
||||
if (is_ascii[1] != is_ascii[0]) os << " "; // Separate text/non text
|
||||
|
||||
if (is_ascii[1]) {
|
||||
os << byte;
|
||||
} else {
|
||||
os << std::hex << std::uppercase << std::setw(2) << std::setfill('0') << static_cast<short>(byte);
|
||||
}
|
||||
is_ascii[0] = is_ascii[1];
|
||||
}
|
||||
os << ")";
|
||||
}
|
||||
return os.str();
|
||||
}
|
||||
|
||||
} // namespace ZeroMQ
|
||||
|
||||
namespace std {
|
||||
ostream &operator<<(ostream &stream, const ZeroMQ::Message &message) {
|
||||
stream << message.dump();
|
||||
return stream;
|
||||
}
|
||||
} // namespace std
|
84
AsioZeroMQ/Message.h
Normal file
84
AsioZeroMQ/Message.h
Normal file
@ -0,0 +1,84 @@
|
||||
#ifndef MESSAGE_H
|
||||
#define MESSAGE_H
|
||||
|
||||
#include "ErrorCode.h"
|
||||
#include <string_view>
|
||||
#include <zmq.h>
|
||||
|
||||
namespace ZeroMQ {
|
||||
|
||||
class Message {
|
||||
public:
|
||||
Message() noexcept;
|
||||
~Message() noexcept;
|
||||
Message(Message &&rhs) noexcept;
|
||||
Message &operator=(Message &&rhs) noexcept;
|
||||
bool operator==(const Message &other) const noexcept;
|
||||
|
||||
bool operator!=(const Message &other) const noexcept;
|
||||
|
||||
Message(size_t size) {
|
||||
int rc = zmq_msg_init_size(&m_message, size);
|
||||
if (rc != 0) throw makeErrorCode();
|
||||
}
|
||||
Message(const void *dataSrc, size_t size);
|
||||
Message(std::string_view string) : Message(string.data(), string.size()) {
|
||||
}
|
||||
|
||||
void *data() noexcept {
|
||||
return zmq_msg_data(&m_message);
|
||||
}
|
||||
const void *data() const noexcept {
|
||||
return zmq_msg_data(const_cast<zmq_msg_t *>(&m_message));
|
||||
}
|
||||
|
||||
template <typename T = void *>
|
||||
T const *data() const noexcept {
|
||||
return static_cast<T const *>(data());
|
||||
}
|
||||
|
||||
size_t size() const noexcept {
|
||||
return zmq_msg_size(const_cast<zmq_msg_t *>(&m_message));
|
||||
}
|
||||
zmq_msg_t *handle() noexcept {
|
||||
return &m_message;
|
||||
}
|
||||
const zmq_msg_t *handle() const noexcept {
|
||||
return &m_message;
|
||||
}
|
||||
bool more() const noexcept {
|
||||
int rc = zmq_msg_more(const_cast<zmq_msg_t *>(&m_message));
|
||||
return rc != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief copy from message
|
||||
*/
|
||||
void copy(Message &message);
|
||||
|
||||
/**
|
||||
* @brief zeromq的字符串不是以\0结束的,所以调用std::string_view::data()可能会导致错误
|
||||
*
|
||||
* @return std::string_view
|
||||
*/
|
||||
std::string_view toStringView() const noexcept {
|
||||
return std::string_view(static_cast<const char *>(data()), size());
|
||||
}
|
||||
|
||||
std::string dump() const;
|
||||
|
||||
protected:
|
||||
Message(const Message &) = delete;
|
||||
void operator=(const Message &) = delete;
|
||||
|
||||
private:
|
||||
// The underlying message
|
||||
zmq_msg_t m_message;
|
||||
};
|
||||
} // namespace ZeroMQ
|
||||
|
||||
namespace std {
|
||||
ostream &operator<<(ostream &stream, const ZeroMQ::Message &message);
|
||||
}
|
||||
|
||||
#endif // MESSAGE_H
|
65
AsioZeroMQ/Options.h
Normal file
65
AsioZeroMQ/Options.h
Normal file
@ -0,0 +1,65 @@
|
||||
#ifndef OPTIONS_H
|
||||
#define OPTIONS_H
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
namespace ZeroMQ {
|
||||
enum class SocketType : int {
|
||||
Req = ZMQ_REQ,
|
||||
Rep = ZMQ_REP,
|
||||
Dealer = ZMQ_DEALER,
|
||||
Router = ZMQ_ROUTER,
|
||||
Pub = ZMQ_PUB,
|
||||
Sub = ZMQ_SUB,
|
||||
Xpub = ZMQ_XPUB,
|
||||
Xsub = ZMQ_XSUB,
|
||||
Push = ZMQ_PUSH,
|
||||
Pull = ZMQ_PULL,
|
||||
Stream = ZMQ_STREAM,
|
||||
Pair = ZMQ_PAIR
|
||||
};
|
||||
|
||||
enum class SendFlags : int {
|
||||
None = 0,
|
||||
Dontwait = ZMQ_DONTWAIT,
|
||||
Sndmore = ZMQ_SNDMORE,
|
||||
};
|
||||
|
||||
enum class RecvFlags : int {
|
||||
None = 0,
|
||||
Dontwait = ZMQ_DONTWAIT,
|
||||
};
|
||||
|
||||
// BoolUnit: if true accepts values of type bool (but passed as T into libzmq)
|
||||
template <int Option, class T, bool BoolUnit = false>
|
||||
struct IntegralOption {};
|
||||
|
||||
// NullTerm:
|
||||
// 0: binary data
|
||||
// 1: null-terminated string (`getsockopt` size includes null)
|
||||
// 2: binary (size 32) or Z85 encoder string of size 41 (null included)
|
||||
template <int Option, int NullTerm = 1>
|
||||
struct ArrayOption {};
|
||||
|
||||
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_LINGER, linger, int);
|
||||
using LingerType = IntegralOption<ZMQ_LINGER, int, false>;
|
||||
inline constexpr LingerType Linger;
|
||||
|
||||
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVHWM, rcvhwm, int);
|
||||
using ReceiveHighWaterMarkType = IntegralOption<ZMQ_RCVHWM, int, false>;
|
||||
inline constexpr ReceiveHighWaterMarkType ReceiveHighWaterMark;
|
||||
|
||||
// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_EVENTS, events, int);
|
||||
using EventsType = IntegralOption<ZMQ_EVENTS, int, false>;
|
||||
inline constexpr EventsType Events;
|
||||
|
||||
// ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_RCVMORE, rcvmore, int);
|
||||
using ReceiveMoreType = IntegralOption<ZMQ_RCVMORE, int, true>;
|
||||
inline constexpr ReceiveMoreType ReceiveMore;
|
||||
|
||||
// ZMQ_DEFINE_ARRAY_OPT(ZMQ_SUBSCRIBE, subscribe);
|
||||
using SubscribeType = ArrayOption<ZMQ_SUBSCRIBE>;
|
||||
inline constexpr SubscribeType Subscribe;
|
||||
|
||||
} // namespace ZeroMQ
|
||||
#endif // OPTIONS_H
|
62
AsioZeroMQ/SocketService.cpp
Normal file
62
AsioZeroMQ/SocketService.cpp
Normal file
@ -0,0 +1,62 @@
|
||||
#include "SocketService.h"
|
||||
#include "ErrorCode.h"
|
||||
#include <boost/assert.hpp>
|
||||
#include <zmq.h>
|
||||
|
||||
#include "BoostLog.h"
|
||||
|
||||
namespace ZeroMQ {
|
||||
SocketService::SocketService(boost::asio::io_context &context) : boost::asio::io_context::service(context) {
|
||||
m_context = zmq_ctx_new();
|
||||
}
|
||||
|
||||
SocketService::~SocketService() { zmq_ctx_term(m_context); }
|
||||
|
||||
void SocketService::construct(ImplementationType &impl, SocketType type) {
|
||||
impl = std::make_shared<Implementation>(m_context, type, get_io_context());
|
||||
}
|
||||
|
||||
void SocketService::destroy(ImplementationType &impl) { impl.reset(); }
|
||||
|
||||
void SocketService::connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error) {
|
||||
std::lock_guard lock_guard(impl->mutex);
|
||||
BOOST_ASSERT_MSG(impl->socket, "invalid socket");
|
||||
auto status = zmq_connect(impl->socket, endpoint.data());
|
||||
if (status < 0) error = makeErrorCode();
|
||||
}
|
||||
|
||||
void SocketService::setOption(SocketService::ImplementationType &impl, int option, const void *optval, size_t optvallen,
|
||||
boost::system::error_code &error) {
|
||||
int status = zmq_setsockopt(impl->socket, option, optval, optvallen);
|
||||
if (status < 0) error = makeErrorCode();
|
||||
}
|
||||
|
||||
void SocketService::option(const ImplementationType &impl, int option, void *optval, size_t *optvallen,
|
||||
boost::system::error_code &error) {
|
||||
int rc = zmq_getsockopt(impl->socket, option, optval, optvallen);
|
||||
if (rc != 0) error = makeErrorCode();
|
||||
}
|
||||
|
||||
SocketService::Implementation::Implementation(void *context, SocketType type, boost::asio::io_context &ioContext) {
|
||||
socket = zmq_socket(context, static_cast<int>(type));
|
||||
NativeHandleType handle = 0;
|
||||
auto size = sizeof(handle);
|
||||
auto rc = zmq_getsockopt(socket, ZMQ_FD, &handle, &size);
|
||||
if (rc < 0) {
|
||||
throw makeErrorCode();
|
||||
}
|
||||
#if !defined BOOST_ASIO_WINDOWS
|
||||
descriptor.reset(new StreamType(ioContext, handle));
|
||||
#else
|
||||
// Use duplicated SOCKET, because ASIO socket takes ownership over it so destroys one in dtor.
|
||||
::WSAPROTOCOL_INFOW pi;
|
||||
::WSADuplicateSocketW(handle, ::GetCurrentProcessId(), &pi);
|
||||
handle = ::WSASocketW(pi.iAddressFamily /*AF_INET*/, pi.iSocketType /*SOCK_STREAM*/, pi.iProtocol /*IPPROTO_TCP*/,
|
||||
&pi, 0, 0);
|
||||
descriptor.reset(new boost::asio::ip::tcp::socket(ioContext, boost::asio::ip::tcp::v4(), handle));
|
||||
#endif
|
||||
}
|
||||
|
||||
SocketService::Implementation::~Implementation() { zmq_close(socket); }
|
||||
|
||||
} // namespace ZeroMQ
|
63
AsioZeroMQ/SocketService.h
Normal file
63
AsioZeroMQ/SocketService.h
Normal file
@ -0,0 +1,63 @@
|
||||
#ifndef SOCKETSERVICE_H
|
||||
#define SOCKETSERVICE_H
|
||||
|
||||
#include "Options.h"
|
||||
#include <boost/asio/execution_context.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||
#include <mutex>
|
||||
|
||||
namespace ZeroMQ {
|
||||
|
||||
#if !defined BOOST_ASIO_WINDOWS
|
||||
struct StreamDescriptorClose {
|
||||
void operator()(boost::asio::posix::stream_descriptor *descriptor) {
|
||||
descriptor->release();
|
||||
delete descriptor;
|
||||
}
|
||||
};
|
||||
using StreamDescriptor = std::unique_ptr<boost::asio::posix::stream_descriptor, StreamDescriptorClose>;
|
||||
using StreamType = boost::asio::posix::stream_descriptor;
|
||||
using NativeHandleType = boost::asio::posix::stream_descriptor::native_handle_type;
|
||||
#else
|
||||
using StreamDescriptor = std::unique_ptr<boost::asio::ip::tcp::socket>;
|
||||
using StreamType = boost::asio::ip::tcp::socket;
|
||||
using NativeHandleType = boost::asio::ip::tcp::socket::native_handle_type;
|
||||
#endif
|
||||
|
||||
class SocketService : public boost::asio::io_context::service {
|
||||
public:
|
||||
inline static boost::asio::execution_context::id id;
|
||||
SocketService(boost::asio::io_context &context);
|
||||
~SocketService();
|
||||
|
||||
class Implementation {
|
||||
public:
|
||||
Implementation(void *context, SocketType type, boost::asio::io_context &ioContext);
|
||||
~Implementation();
|
||||
|
||||
void *socket;
|
||||
StreamDescriptor descriptor;
|
||||
std::mutex mutex;
|
||||
};
|
||||
using ImplementationType = std::shared_ptr<Implementation>;
|
||||
|
||||
void construct(ImplementationType &impl, SocketType type);
|
||||
|
||||
void destroy(ImplementationType &impl);
|
||||
|
||||
void connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error);
|
||||
void setOption(ImplementationType &impl, int option, const void *optval, size_t optvallen,
|
||||
boost::system::error_code &error);
|
||||
void option(const ImplementationType &impl, int option, void *optval, size_t *optvallen,
|
||||
boost::system::error_code &error);
|
||||
|
||||
/// Destroy all user-defined handler objects owned by the service.
|
||||
void shutdown() {}
|
||||
|
||||
private:
|
||||
void *m_context;
|
||||
};
|
||||
} // namespace ZeroMQ
|
||||
|
||||
#endif // SOCKETSERVICE_H
|
10
AsioZeroMQ/ZeroMQSocket.h
Normal file
10
AsioZeroMQ/ZeroMQSocket.h
Normal file
@ -0,0 +1,10 @@
|
||||
#ifndef SOCKET_H
|
||||
#define SOCKET_H
|
||||
|
||||
#include "BasicSocket.h"
|
||||
#include "SocketService.h"
|
||||
|
||||
namespace ZeroMQ {
|
||||
using Socket = BasicSocket<SocketService>;
|
||||
}
|
||||
#endif // SOCKET_H
|
@ -2,5 +2,9 @@ cmake_minimum_required(VERSION 3.15)
|
||||
|
||||
project(Kylin)
|
||||
|
||||
set(ZeroMQ_INCLUDE_DIR ${ZeroMQ_ROOT}/include)
|
||||
set(ZeroMQ_LIBRARY_DIRS ${ZeroMQ_ROOT}/lib)
|
||||
|
||||
add_subdirectory(AsioZeroMQ)
|
||||
add_subdirectory(HttpProxy)
|
||||
add_subdirectory(Universal)
|
@ -18,7 +18,8 @@ function cmake_scan() {
|
||||
-S ${base_path} \
|
||||
-B ${build_path} \
|
||||
-DCMAKE_BUILD_TYPE=Debug \
|
||||
-DBOOST_ROOT=${libraries_root}/boost_1_82_0
|
||||
-DBOOST_ROOT=${libraries_root}/boost_1_82_0 \
|
||||
-DZeroMQ_ROOT=${libraries_root}/zeromq-4.3.4_debug
|
||||
}
|
||||
|
||||
function build() {
|
||||
|
Loading…
Reference in New Issue
Block a user