Go to the documentation of this file.
9 #ifndef EAGINE_MESSAGE_BUS_POSIX_MQUEUE_HPP
10 #define EAGINE_MESSAGE_BUS_POSIX_MQUEUE_HPP
12 #include "../bool_aggregate.hpp"
13 #include "../branch_predict.hpp"
14 #include "../main_ctx_object.hpp"
15 #include "../random_identifier.hpp"
16 #include "../serialize/block_sink.hpp"
17 #include "../serialize/block_source.hpp"
18 #include "../serialize/string_backend.hpp"
41 swap(_name, temp._name);
42 swap(_ihandle, temp._ihandle);
43 swap(_ohandle, temp._ohandle);
71 _name = std::move(name);
73 if(_name.front() !=
'/') {
74 _name.insert(_name.begin(),
'/');
80 static auto name_from(
identifier id) -> std::string {
83 id.name().str(result);
103 char buf[128] = {
'\0'};
104 ::strerror_r(_last_errno,
static_cast<char*
>(buf),
sizeof(buf));
105 return {
static_cast<const char*
>(buf)};
114 return _last_errno != 0;
120 return (_last_errno == EAGAIN) || (_last_errno == ETIMEDOUT);
125 constexpr
auto is_open() const noexcept ->
bool {
126 return (_ihandle >= 0) && (_ohandle >= 0);
142 ::mq_unlink((_name +
"0").
c_str());
143 ::mq_unlink((_name +
"1").
c_str());
155 _ihandle = ::mq_open(
156 (_name +
"1").
c_str(),
158 O_RDONLY | O_CREAT | O_EXCL | O_NONBLOCK,
165 _ohandle = ::mq_open(
166 (_name +
"0").
c_str(),
168 O_WRONLY | O_CREAT | O_EXCL | O_NONBLOCK,
184 _ihandle = ::mq_open(
185 (_name +
"0").
c_str(),
187 O_RDONLY | O_NONBLOCK,
194 _ohandle = ::mq_open(
195 (_name +
"1").
c_str(),
197 O_WRONLY | O_NONBLOCK,
212 ::mq_close(_ihandle);
213 ::mq_close(_ohandle);
214 _ihandle = _invalid_handle();
215 _ohandle = _invalid_handle();
221 constexpr
static auto default_data_size() noexcept ->
span_size_t {
229 struct ::mq_attr attr {};
231 ::mq_getattr(_ohandle, &attr);
244 auto send(
unsigned priority, span<const char> blk) ->
auto& {
247 ::mq_send(_ohandle, blk.data(),
std_size(blk.size()), priority);
260 unsigned priority{0U};
263 ::mq_receive(_ihandle, blk.data(), blk.size(), &priority);
266 handler(priority,
head(blk, received));
275 static constexpr
auto _invalid_handle() noexcept -> ::mqd_t {
279 ::mqd_t _ihandle{_invalid_handle()};
280 ::mqd_t _ohandle{_invalid_handle()};
287 template <
typename Base>
325 auto open(std::string name) ->
bool {
326 return !_data_queue.
set_name(std::move(name)).open().had_error();
334 return {_buffer.
size()};
338 std::unique_lock lock{_mutex};
340 something_done(_receive());
341 something_done(_send());
342 return something_done;
346 std::unique_lock lock{_mutex};
351 _outgoing.push(sink.
done());
358 std::unique_lock lock{_mutex};
365 if(connect_queue.is_usable()) {
374 _buffer.
resize(connect_queue.data_size());
379 EAGINE_MSGBUS_ID(pmqConnect),
380 message_view(_data_queue.
get_name()),
383 connect_queue.send(1,
as_chars(sink.done()));
390 return something_done;
393 auto _receive() ->
bool {
394 some_true something_done{};
405 return something_done;
408 auto _send() ->
bool {
410 return _outgoing.fetch_all(
418 return !_data_queue.
send(1,
as_chars(data)).had_error();
421 void _handle_receive(
unsigned, memory::span<const char> data) {
425 block_data_source source(
as_bytes(data));
426 string_deserializer_backend
backend(source);
433 memory::buffer _buffer;
434 message_storage _incoming;
435 serialized_message_storage _outgoing;
436 posix_mqueue _data_queue{};
437 std::default_random_engine _rand_eng{std::random_device{}()};
454 , _connect_queue{std::move(name)} {}
459 , _connect_queue{posix_mqueue::name_from(
id)} {}
471 std::unique_lock lock{_mutex};
473 something_done(_checkup());
474 something_done(_receive());
475 something_done(_send());
476 return something_done;
480 auto _checkup() ->
bool {
483 _connect_queue.
close();
484 _connect_queue.
open();
487 return posix_mqueue_connection::_checkup(_connect_queue) &&
491 posix_mqueue _connect_queue{};
509 , _accept_queue{std::move(name)} {
528 something_done(_checkup());
529 something_done(_receive());
530 return something_done;
534 return _process(handler);
538 auto _checkup() ->
bool {
541 _accept_queue.
close();
543 if(!_accept_queue.
create().had_error()) {
548 return something_done;
551 auto _receive() ->
bool {
552 some_true something_done{};
563 return something_done;
566 void _handle_receive(
unsigned, memory::span<const char> data) {
570 block_data_source source(
as_bytes(data));
571 string_deserializer_backend
backend(source);
574 if(EAGINE_LIKELY(msg_id.has_method(
EAGINE_ID(pmqConnect)))) {
583 auto fetch_handler = [
this, &handler](
586 const message_view&
message) ->
bool {
587 EAGINE_ASSERT((msg_id == EAGINE_MSGBUS_ID(pmqConnect)));
588 EAGINE_MAYBE_UNUSED(msg_id);
590 if(
auto conn = std::make_unique<posix_mqueue_connection>(*
this)) {
592 handler(std::move(conn));
600 memory::buffer _buffer{};
601 message_storage _requests{};
602 posix_mqueue _accept_queue{};
622 return std::make_unique<posix_mqueue_acceptor>(
628 -> std::unique_ptr<connection>
final {
629 return std::make_unique<posix_mqueue_connector>(
636 #endif // EAGINE_MESSAGE_BUS_POSIX_MQUEUE_HPP
Class wrapping a POSIX message queue.
Definition: posix_mqueue.hpp:33
auto push_if(Function function, span_size_t req_size=0) -> bool
Pushes a new message and lets a function to fill it.
Definition: message.hpp:475
auto error_message() const -> std::string
Returns the error message of the last failed operation.
Definition: posix_mqueue.hpp:101
auto make_acceptor()
Make a new acceptor listening on a default address.
Definition: conn_factory.hpp:34
Helper class used to initialize main context objects.
Definition: main_ctx_object.hpp:45
auto open(std::string name) -> bool
Opens the connection.
Definition: posix_mqueue.hpp:325
posix_mqueue_connector(main_ctx_parent parent, identifier id)
Construction from parent main context object and queue identifier.
Definition: posix_mqueue.hpp:457
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
static constexpr auto as_bytes(basic_span< T, P, S > spn) noexcept -> basic_block< std::is_const_v< T >>
Converts a span into a basic_block.
Definition: block.hpp:39
auto max_data_size() -> valid_if_positive< span_size_t >
Returns the absolute maximum block size that can be sent in a message.
Definition: posix_mqueue.hpp:227
auto get_name() const noexcept -> string_view
Returns the unique name of this queue.
Definition: posix_mqueue.hpp:64
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
constexpr auto is_open() const noexcept -> bool
Indicates if this message queue is open.
Definition: posix_mqueue.hpp:125
static constexpr auto c_str(memory::basic_span< C, P, S > s) -> std::enable_if_t< std::is_convertible_v< memory::basic_span< C, P, S >, basic_string_span< C, P, S >>, basic_c_str< C, P, S >>
Functions that construct a basic_c_str from a basic_string_span.
Definition: string_span.hpp:226
static constexpr auto span_size(T v) noexcept
Converts argument to span size type.
Definition: types.hpp:59
any_random_engine(Engine &) -> any_random_engine< typename Engine::result_type >
Deduction guide for any_random_engine.
Base class for main context objects.
Definition: main_ctx_object.hpp:71
Implementation of the connection_info interface for POSIX queue connection.
Definition: posix_mqueue.hpp:288
auto make_acceptor(string_view address) -> std::unique_ptr< acceptor > final
Makes an connection acceptor listening at queue with the specified name.
Definition: posix_mqueue.hpp:621
static constexpr auto is_special_message(message_id msg_id) noexcept
Indicates if the specified message id denotes a special message bus message.
Definition: message.hpp:36
auto fetch_all(fetch_handler handler) -> bool
Fetches all currently stored messages and calls handler on them.
auto send(unsigned priority, span< const char > blk) -> auto &
Sents a block of data with the specified priority.
Definition: posix_mqueue.hpp:244
static constexpr auto cover(T *addr, S size) noexcept -> span_if_mutable< T >
Creates a span starting at the specified pointer and specified length.
Definition: span.hpp:465
basic_block< true > const_block
Alias for const byte memory span.
Definition: block.hpp:32
@ local_interprocess
Inter-process connection for local communication.
Primary template for conditionally valid values.
Definition: decl.hpp:49
Implementation of connection_factory for POSIX message queue connections.
Definition: posix_mqueue.hpp:609
Serialization data sink backed by a pre-allocated memory block.
Definition: block_sink.hpp:23
std::chrono::steady_clock::time_point message_timestamp
Alias for message timestamp type.
Definition: message.hpp:49
auto serialize_message(message_id msg_id, const message_view &msg, Backend &backend) -> std::enable_if_t< std::is_base_of_v< serializer_backend, Backend >, serialization_errors >
Serializes a bus message with the specified serializer backend.
Definition: serialize.hpp:83
auto done() const noexcept -> memory::block
Returns the part of the backing block already written to.
Definition: block_sink.hpp:43
static constexpr auto head(basic_span< T, P, S > s, L l) noexcept -> basic_span< T, P, S >
Returns the first l elements from the front of a span.
Definition: span_algo.hpp:99
static constexpr auto max_size() noexcept -> size_type
Returns the maximum length of this identifier type.
Definition: identifier.hpp:223
posix_mqueue_connection_factory(main_ctx_parent parent)
Construction from parent main context object.
Definition: posix_mqueue.hpp:614
auto receive(memory::span< char > blk, receive_handler handler) -> auto &
Receives messages and calls the specified handler on them.
Definition: posix_mqueue.hpp:258
auto set_name(identifier id) -> auto &
Sets the unique name of the queue.
Definition: posix_mqueue.hpp:89
callable_ref< void(unsigned, span< const char >)> receive_handler
Alias for received message handler.
Definition: posix_mqueue.hpp:255
constexpr auto is_usable() const noexcept -> bool
Indicates if this message queue can be used.
Definition: posix_mqueue.hpp:132
auto update() -> bool final
Updates the internal state of the connection (called repeatedly).
Definition: posix_mqueue.hpp:470
auto make_connector()
Make a new connector connecting to the specified address.
Definition: conn_factory.hpp:40
auto max_data_size() -> valid_if_positive< span_size_t > final
Returns the maximum data block size in bytes that can be sent.
Definition: posix_mqueue.hpp:333
basic_address< false > address
Type alias for non-const memory address values.
Definition: address.hpp:203
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
static constexpr auto std_size(T v) noexcept
Converts argument to std size type.
Definition: types.hpp:52
Class storing initially false value and logically or-ing other values.
Definition: bool_aggregate.hpp:16
auto fetch_messages(fetch_handler handler) -> bool final
Fetch all enqueued messages that have been received since last fetch.
Definition: posix_mqueue.hpp:357
Non-owning view of a contiguous range of memory with ValueType elements.
Definition: flatten_fwd.hpp:16
auto backend() noexcept
Returns a pointer to the backend of this logger object.
Definition: logger.hpp:55
auto send(message_id msg_id, const message_view &message) -> bool final
Sent a message with the specified id.
Definition: posix_mqueue.hpp:345
auto open() -> auto &
Opens existing OS queue objects.
Definition: posix_mqueue.hpp:181
auto random_identifier(any_random_engine< std::uint32_t > engine) -> identifier
Creates an random identifier using the specified random engine.
posix_mqueue(std::string name)
Constructs the queue and sets the specified name.
Definition: posix_mqueue.hpp:95
posix_mqueue(posix_mqueue &&temp) noexcept
Move constructible.
Definition: posix_mqueue.hpp:39
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
#define EAGINE_THIS_MEM_FUNC_C(FUNC)
Macro for creating object of member_function_constant in member functions.
Definition: mem_func_const.hpp:206
auto deserialize_message(identifier &class_id, identifier &method_id, stored_message &msg, Backend &backend) -> std::enable_if_t< std::is_base_of_v< deserializer_backend, Backend >, deserialization_errors >
Deserializes a bus message with the specified deserializer backend.
Definition: serialize.hpp:138
@ message
Message protocol.
auto operator=(posix_mqueue &&temp)=delete
Not move assignable.
auto process_accepted(const accept_handler &handler) -> bool final
Lets the handler process the pending accepted connections.
Definition: posix_mqueue.hpp:533
connection_addr_kind
Message bus connection address kind enumeration.
Definition: connection.hpp:30
auto needs_retry() const -> bool
Indicates if a previous operation on the queue needs to be retried.
Definition: posix_mqueue.hpp:119
callable_ref< bool(message_id, message_age, const message_view &)> fetch_handler
Alias for fetch handler callable reference type.
Definition: connection.hpp:118
connection_kind
Message bus connection kind bits enumeration.
Definition: connection_kind.hpp:21
posix_mqueue_acceptor(main_ctx_parent parent, std::string name) noexcept
Construction from parent main context object and queue name.
Definition: posix_mqueue.hpp:507
Cross-platform implementation of serializer backend using ASCII-only strings.
Definition: string_backend.hpp:26
auto update() -> bool final
Updates the internal state of the acceptor (called repeatedly).
Definition: posix_mqueue.hpp:526
Basic template for limited length, packed string identifiers.
Definition: identifier.hpp:178
auto update() -> bool override
Updates the internal state of the connection (called repeatedly).
Definition: posix_mqueue.hpp:337
auto create() -> auto &
Creates new OS queue objects.
Definition: posix_mqueue.hpp:152
auto size() const noexcept -> span_size_t
Returns the size of the buffer in bytes.
Definition: buffer.hpp:81
Implementation of acceptor on top of POSIX message queues.
Definition: posix_mqueue.hpp:498
auto had_error() const -> bool
Indicates if there a previous operation finished with an error.
Definition: posix_mqueue.hpp:113
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
constexpr posix_mqueue() noexcept=default
Default constructor.
posix_mqueue_connector(main_ctx_parent parent, std::string name) noexcept
Construction from parent main context object and queue name.
Definition: posix_mqueue.hpp:452
auto data_size() noexcept -> span_size_t
Returns the maximum block size that can be sent in a message.
Definition: posix_mqueue.hpp:239
acceptor::accept_handler accept_handler
Alias for accepted connection handler callable.
Definition: posix_mqueue.hpp:504
Implementation of connection on top of POSIX message queues.
Definition: posix_mqueue.hpp:444
auto set_name(std::string name) -> auto &
Sets the unique name of the queue.
Definition: posix_mqueue.hpp:70
Implementation of connection on top of POSIX message queues.
Definition: posix_mqueue.hpp:310
auto resize(span_size_t new_size) -> auto &
Resizes the buffer to the specified number of bytes.
Definition: buffer.hpp:114
@ filepath
Filesystem path.
auto is_usable() -> bool final
Checks if the connection is in usable state.
Definition: posix_mqueue.hpp:329
static constexpr auto as_chars(block blk) noexcept
Converts a block into a span of characters.
Definition: block.hpp:48
auto unlink() -> auto &
Unlinks the OS queue objects.
Definition: posix_mqueue.hpp:140
callable_ref< void(std::unique_ptr< connection >)> accept_handler
Alias for accepted connection handler callable reference type.
Definition: acceptor.hpp:23
@ message_id
The message type id has been verified.
auto make_connector(string_view address) -> std::unique_ptr< connection > final
Makes a connector connecting to queue with the specified name.
Definition: posix_mqueue.hpp:627
std::chrono::duration< float > message_age
Alias for message age type.
Definition: message.hpp:54
auto close() -> posix_mqueue &
Closes the OS queue objects.
Definition: posix_mqueue.hpp:210
posix_mqueue_connection(main_ctx_parent parent)
Construction from parent main context object.
Definition: posix_mqueue.hpp:319
posix_mqueue_acceptor(main_ctx_parent parent, identifier id)
Construction from parent main context object and queue identifier.
Definition: posix_mqueue.hpp:514