Go to the documentation of this file.
9 #ifndef EAGINE_MESSAGE_BUS_MESSAGE_HPP
10 #define EAGINE_MESSAGE_BUS_MESSAGE_HPP
12 #include "../assert.hpp"
13 #include "../bitfield.hpp"
14 #include "../callable_ref.hpp"
15 #include "../main_ctx_fwd.hpp"
16 #include "../memory/buffer_pool.hpp"
17 #include "../memory/copy.hpp"
18 #include "../memory/span_algo.hpp"
19 #include "../message_id.hpp"
20 #include "../reflect/map_enumerators.hpp"
21 #include "../serialize/size_and_data.hpp"
32 #define EAGINE_MSGBUS_ID(METHOD) EAGINE_MSG_ID(eagiMsgBus, METHOD)
37 return msg_id.has_class(
EAGINE_ID(eagiMsgBus));
76 using U = std::underlying_type_t<message_priority>;
80 template <
typename Selector>
82 enumerator_mapping(type_identity<message_priority>, Selector) noexcept {
83 return enumerator_map_type<message_priority, 5>{
106 template <
typename Selector>
109 return enumerator_map_type<message_crypto_flag, 3>{
120 static constexpr
auto invalid_id() noexcept ->
identifier_t {
173 auto assign(
const message_info& that) noexcept ->
auto& {
188 EAGINE_ASSERT(
hop_count < std::numeric_limits<hop_count_t>::max());
216 const float added_quarter_seconds = (
age.count() + 0.20F) * 4.F;
217 if(
auto new_age{convert_if_fits<age_t>(
325 , _buffer{std::move(buf)} {
331 return {*
this,
data()};
335 template <
typename Source>
338 source.fetch_all(_buffer);
346 template <
typename Backend,
typename Value>
347 auto do_store_value(
const Value& value,
span_size_t max_size) -> bool;
350 template <
typename Value>
353 template <
typename Backend,
typename Value>
354 auto do_fetch_value(Value& value) -> bool;
357 template <
typename Value>
362 return cover(_buffer);
367 return view(_buffer);
425 return std::move(_buffer);
450 _messages.reserve(64);
454 auto empty() const noexcept ->
bool {
455 return _messages.empty();
465 _messages.emplace_back(
474 template <
typename Function>
476 _messages.emplace_back(
480 auto& [msg_id,
message, insert_time] = _messages.back();
481 EAGINE_MAYBE_UNUSED(insert_time);
482 bool rollback =
false;
484 if(!
function(msg_id, insert_time,
message)) {
492 _messages.pop_back();
519 using _clock_t = std::chrono::steady_clock;
521 std::vector<std::tuple<message_id, stored_message, message_timestamp>>
525 class message_pack_info {
527 using bit_set = std::uint64_t;
530 : _total_size{limit_cast<std::uint16_t>(total_size)} {}
532 auto is_empty() const noexcept {
533 return _packed_bits == 0U;
536 auto bits() const noexcept -> bit_set {
542 auto bits = _packed_bits;
558 auto usage() const noexcept {
559 return float(used()) / float(total());
562 void add(
span_size_t msg_size, bit_set current_bit) noexcept {
563 _packed_size += limit_cast<std::uint16_t>(msg_size);
564 _packed_bits |= current_bit;
568 bit_set _packed_bits{0U};
569 std::uint16_t _packed_size{0};
570 const std::uint16_t _total_size{0};
573 class serialized_message_storage {
577 using fetch_handler =
580 serialized_message_storage() {
581 _messages.reserve(32);
584 auto empty() const noexcept ->
bool {
585 return _messages.empty();
593 if(!_messages.empty()) {
594 return view(std::get<0>(_messages.front()));
599 void pop() noexcept {
600 EAGINE_ASSERT(!_messages.empty());
601 _buffers.eat(std::move(std::get<0>(_messages.front())));
602 _messages.erase(_messages.begin());
606 EAGINE_ASSERT(!
message.empty());
607 auto buf = _buffers.get(
message.size());
609 _messages.emplace_back(std::move(buf), _clock_t::now());
612 auto fetch_all(fetch_handler handler) -> bool;
616 void cleanup(
const message_pack_info& to_be_removed);
619 using _clock_t = std::chrono::steady_clock;
620 memory::buffer_pool _buffers;
621 std::vector<std::tuple<memory::buffer, message_timestamp>> _messages;
626 class message_context {
628 message_context(endpoint& ep) noexcept
631 constexpr message_context(endpoint& ep, message_id mi) noexcept
633 , _msg_id{std::move(mi)} {}
635 auto bus() const noexcept -> endpoint& {
639 auto msg_id() const noexcept -> const message_id& {
643 auto set_msg_id(message_id msg_id) noexcept -> message_context& {
644 _msg_id = std::move(msg_id);
653 class message_priority_queue {
656 callable_ref<bool(
const message_context&, stored_message&)>;
658 message_priority_queue() {
659 _messages.reserve(128);
662 auto size() const noexcept {
663 return _messages.size();
666 auto push(
const message_view& message) -> stored_message& {
667 auto pos = std::lower_bound(
671 [](
auto& msg,
auto pri) { return msg.priority < pri; });
673 return *_messages.emplace(
674 pos, message, _buffers.get(
message.data.size()));
677 auto process_one(
const message_context& msg_ctx, handler_type handler)
679 if(!_messages.empty()) {
680 if(handler(msg_ctx, _messages.back())) {
681 _buffers.eat(_messages.back().release_buffer());
682 _messages.pop_back();
689 auto process_all(
const message_context& msg_ctx, handler_type handler)
693 while(pos < _messages.size()) {
694 if(handler(msg_ctx, _messages[pos])) {
696 _buffers.eat(_messages[pos].release_buffer());
697 _messages.erase(_messages.begin() + pos);
706 memory::buffer_pool _buffers;
707 std::vector<stored_message> _messages;
710 class connection_outgoing_messages {
713 return _serialized.count();
717 main_ctx_object& user,
723 return _serialized.pack_into(dest);
726 void cleanup(
const message_pack_info& packed) {
727 _serialized.cleanup(packed);
731 serialized_message_storage _serialized{};
734 class connection_incoming_messages {
736 using fetch_handler =
739 auto empty() const noexcept ->
bool {
740 return _packed.empty();
744 return _packed.count();
751 auto fetch_messages(main_ctx_object& user, fetch_handler handler) -> bool;
754 serialized_message_storage _packed{};
755 message_storage _unpacked{};
760 #if !EAGINE_LINK_LIBRARY || defined(EAGINE_IMPLEMENTING_LIBRARY)
761 #include <eagine/message_bus/message.inl>
764 #endif // EAGINE_MESSAGE_BUS_MESSAGE_HPP
auto push_if(Function function, span_size_t req_size=0) -> bool
Pushes a new message and lets a function to fill it.
Definition: message.hpp:475
message_sequence_t sequence_t
Alias for the sequence number type.
Definition: message.hpp:141
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
@ high
High, sent before messages with lower priority.
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
@ idle
Idle, sent only when no messages with higher priority are enqueued.
static constexpr auto as_bytes(basic_span< T, P, S > spn) noexcept -> basic_block< std::is_const_v< T >>
Converts a span into a basic_block.
Definition: block.hpp:39
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
static constexpr auto span_size(T v) noexcept
Converts argument to span size type.
Definition: types.hpp:59
static auto copy_into(const_block source, buffer &dest) -> block
Copies the content of source block to destination buffer.
Definition: copy.hpp:33
Base class for main context objects.
Definition: main_ctx_object.hpp:71
bitfield< verification_bit > verification_bits
Alias for a bus message verification bitfield.
Definition: verification.hpp:47
auto content() noexcept -> memory::block
Returns a mutable view of the data content of the message.
Definition: message.hpp:389
static constexpr auto is_special_message(message_id msg_id) noexcept
Indicates if the specified message id denotes a special message bus message.
Definition: message.hpp:36
auto fetch_all(fetch_handler handler) -> bool
Fetches all currently stored messages and calls handler on them.
auto data() const noexcept -> memory::const_block
Returns a const view of the storage buffer.
Definition: message.hpp:366
auto store_value(const Value &value, span_size_t max_size) -> bool
Serializes and stores the specified value (up to max_size).
Definition: serialize.hpp:293
auto fetch_value(Value &value) -> bool
Deserializes the stored content into the specified value.
Definition: serialize.hpp:306
constexpr message_view() noexcept=default
Default constructor.
Structure storing information about a sigle message bus message.
Definition: message.hpp:119
static constexpr auto cover(T *addr, S size) noexcept -> span_if_mutable< T >
Creates a span starting at the specified pointer and specified length.
Definition: span.hpp:465
static constexpr auto extract(api_result_value< Result, api_result_validity::never > &) noexcept -> Result &
Overload of extract for api_result_value.
Definition: c_api_wrap.hpp:270
auto age() const noexcept -> message_age
Returns the message age.
Definition: message.hpp:229
basic_block< true > const_block
Alias for const byte memory span.
Definition: block.hpp:32
auto set_serializer_id(identifier id) noexcept -> auto &
Sets the id of the used data content serializer.
Definition: message.hpp:261
@ info
Informational log entries.
constexpr auto has(bit_type bit) const noexcept
Tests if the specified bit is set.
Definition: bitfield.hpp:70
std::chrono::steady_clock::time_point message_timestamp
Alias for message timestamp type.
Definition: message.hpp:49
static constexpr auto view(T *addr, S size) noexcept -> const_span< T >
Creates a view starting at the specified pointer and specified length.
Definition: span.hpp:458
@ endpoint
Message bus client endpoint.
hop_count_t hop_count
The message hop counter.
Definition: message.hpp:154
basic_callable_ref< Sig, is_noexcept_function_v< Sig > > callable_ref
Alias for callable object references.
Definition: callable_ref.hpp:191
Class storing multiple reusable memory buffer instances.
Definition: buffer_pool.hpp:21
auto get(span_size_t req_size=0) -> memory::buffer
Gets a buffer with the specified required size.
Definition: buffer_pool.hpp:26
basic_block< false > block
Alias for non-const byte memory span.
Definition: block.hpp:27
identifier_t source_id
Returns the source endpoint identifier.
Definition: message.hpp:127
auto verify_bits(context &, main_ctx_object &) const noexcept -> verification_bits
Verifies the signatures of this message.
memory::const_block data
View of the message data content.
Definition: message.hpp:291
void fetch_all_from(Source &source)
Copies the remaining data from the specified serialization source.
Definition: message.hpp:336
Class holding common message bus utility objects.
Definition: context.hpp:37
auto setup_response(const message_info &info) noexcept -> auto &
Sets the target id to be the source id from info, copies sequence number.
Definition: message.hpp:277
identifier_t target_id
Returns the target endpoint identifier.
Definition: message.hpp:132
identifier_t serializer_id
Returns the identifier of the used serializer.
Definition: message.hpp:137
stored_message(message_view message, memory::buffer buf) noexcept
Construction from a message view and storage buffer. Adopts the buffer and copies the content from th...
Definition: message.hpp:323
static constexpr auto broadcast_endpoint_id() noexcept -> identifier_t
Returns the special broadcase message bus endpoint id.
Definition: message.hpp:42
auto empty() const noexcept -> bool
Indicates if the storage is empty.
Definition: message.hpp:454
@ signed_content
The message content is signed.
sequence_t sequence_no
The message sequence number.
Definition: message.hpp:145
constexpr message_view(memory::block init) noexcept
Construction from a mutable memory block.
Definition: message.hpp:301
@ signed_header
The message header is signed.
auto count() const noexcept -> span_size_t
Returns the coung of messages in the storage.
Definition: message.hpp:459
message_storage()
Default constructor.
Definition: message.hpp:449
Reallocatable owning byte buffer.
Definition: buffer.hpp:22
void cleanup(cleanup_predicate predicate)
Removes messages based on the result of the specified predicate.
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
auto has_serializer_id(identifier id) const noexcept -> bool
Tests if a data serializer with the specified id was used.
Definition: message.hpp:254
std::uint32_t message_sequence_t
Alias for message sequence number type.
Definition: types.hpp:22
Non-owning view of a contiguous range of memory with ValueType elements.
Definition: flatten_fwd.hpp:16
message_crypto_flag
Message cryptography-related flag bits enumeration.
Definition: message.hpp:94
static constexpr auto skip(basic_span< T, P, S > s, L l) noexcept -> basic_span< T, P, S >
Skips a specified count of elements from the front of a span.
Definition: span_algo.hpp:60
auto content() const noexcept -> memory::const_block
Returns a const view of the data content of the message.
Definition: message.hpp:399
callable_ref< bool(message_id, message_age, const message_view &)> fetch_handler
Alias for the message fetch handler.
Definition: message.hpp:504
auto clear() -> auto &
Clears the buffer.
Definition: buffer.hpp:149
@ critical
Critical, sent as soon as possible.
message_priority
Message priority enumeration.
Definition: message.hpp:58
void eat(memory::buffer used)
Returns the specified buffer back to the pool for further reuse.
Definition: buffer_pool.hpp:44
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
@ normal
Normal, default message priority.
auto signature() const noexcept -> memory::const_block
Returns the message signature.
Definition: message.hpp:380
auto set_sequence_no(message_sequence_t no) noexcept -> auto &
Sets the sequence number of this message (has message-type specific meaning).
Definition: message.hpp:268
auto storage() noexcept -> memory::block
Returns a mutable view of the storage buffer.
Definition: message.hpp:361
stored_message()=default
Default constructor.
@ message
Message protocol.
auto set_target_id(identifier_t id) noexcept -> auto &
Sets the target endpoint identifier.
Definition: message.hpp:246
constexpr message_view(string_view init) noexcept
Construction from a string view.
Definition: message.hpp:305
auto add_hop() noexcept -> auto &
Increments the hop counter.
Definition: message.hpp:187
message_crypto_flags crypto_flags
The message cryptography flags.
Definition: message.hpp:171
Class storing message bus messages.
Definition: message.hpp:446
void clear_data() noexcept
Clears the content of the storage buffer.
Definition: message.hpp:419
static auto get_data_with_size(memory::block src) noexcept -> memory::block
Extracts a sub-block from a larger mutable block with encoded sub-block size.
Definition: size_and_data.hpp:62
auto is_signed() const noexcept -> bool
Indicates if the header or the content is signed.
Definition: message.hpp:372
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
static auto skip_data_with_size(memory::const_block src) noexcept -> span_size_t
In a block starting with sub-block with size returns the size of the sub-block.
Definition: size_and_data.hpp:50
void push(message_id msg_id, const message_view &message)
Pushes a message into this storage.
Definition: message.hpp:464
Template type used mostly for function type-tag dispatching.
Definition: type_identity.hpp:19
auto store_and_sign(memory::const_block data, span_size_t max_size, context &, main_ctx_object &) -> bool
Stores the specified data and signs it.
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
auto text_content() const noexcept
Returns the content as a const string view.
Definition: message.hpp:414
auto set_source_id(identifier_t id) noexcept -> auto &
Sets the source endpoint identifier.
Definition: message.hpp:240
message_priority priority
The message priority.
Definition: message.hpp:168
auto add_age(message_age age) noexcept -> auto &
Adds to the age seconds counter.
Definition: message.hpp:215
constexpr message_view(message_info info, memory::const_block init) noexcept
Construction from a message info and a const memory block.
Definition: message.hpp:309
@ low
Low message priority.
Combines message information and an owned message content buffer.
Definition: message.hpp:316
std::int8_t age_t
Alias for type used to store the message age in quarter seconds.
Definition: message.hpp:157
@ asymmetric
Assymetric cipher is used (symmetric otherwise).
static constexpr auto as_chars(block blk) noexcept
Converts a block into a span of characters.
Definition: block.hpp:48
auto too_many_hops() const noexcept -> bool
Indicates that the message made too many hops.
Definition: message.hpp:180
static constexpr auto operator<(const valid_if< T, P1 > &v1, const valid_if< T, P2 > &v2) noexcept -> tribool
Less-than comparison of two conditionally valid values.
Definition: decl.hpp:180
auto set_priority(message_priority new_priority) noexcept -> auto &
Sets the priority of this message.
Definition: message.hpp:234
auto release_buffer() noexcept -> memory::buffer
Releases and returns the storage buffer (without clearing it).
Definition: message.hpp:424
@ message_id
The message type id has been verified.
std::int8_t hop_count_t
Alias for type used to store the message hop count.
Definition: message.hpp:148
auto too_old() const noexcept -> bool
Indicates that the message is too old.
Definition: message.hpp:196
auto text_content() noexcept
Returns the content as a mutable string view.
Definition: message.hpp:408
std::chrono::duration< float > message_age
Alias for message age type.
Definition: message.hpp:54
age_t age_quarter_seconds
The message age in quarter seconds.
Definition: message.hpp:164
void store_content(memory::const_block blk)
Copies the content from the given block into the internal buffer.
Definition: message.hpp:342