Go to the documentation of this file.
9 #ifndef EAGINE_MESSAGE_BUS_ENDPOINT_HPP
10 #define EAGINE_MESSAGE_BUS_ENDPOINT_HPP
12 #include "../application_config.hpp"
13 #include "../flat_map.hpp"
14 #include "../main_ctx_object.hpp"
15 #include "../timeout.hpp"
24 class friend_of_endpoint;
34 static constexpr
auto invalid_id() noexcept ->
identifier_t {
40 return id != invalid_id();
61 , _allow_blob{std::move(allow_blob)} {}
73 EAGINE_ASSERT(_context);
77 ~
endpoint() noexcept override = default;
85 _endpoint_id =
id.value();
139 auto add_connection(std::unique_ptr<connection> conn) ->
bool final;
172 if(EAGINE_LIKELY(
has_id())) {
173 return _do_send(msg_id,
message);
188 template <
typename T>
192 const auto max_size =
extract(opt_size);
194 [
this, msg_id, &info, &value, max_size](
196 if(
message.store_value(value, max_size)) {
197 message.assign(info);
217 std::chrono::seconds max_time,
219 _blobs.push_outgoing(
220 msg_id, _endpoint_id, target_id, blob, max_time, priority);
232 std::chrono::seconds max_time,
246 std::chrono::seconds max_time) ->
bool {
257 return post(msg_id, {});
286 message_id meta_msg_id,
383 Class* instance) ->
bool {
398 shared_context _context{make_context(*
this)};
403 timeout _no_id_timeout{
404 cfg_init(
"msg_bus.endpoint.no_id_timeout", std::chrono::seconds{2}),
406 resetting_timeout _should_notify_alive{
407 cfg_init(
"msg_bus.endpoint.alive_notify_period", std::chrono::seconds{30}),
410 std::unique_ptr<connection> _connection{};
412 message_storage _outgoing{};
414 flat_map<message_id, std::tuple<span_size_t, message_priority_queue>>
417 template <
typename Entry>
418 static inline auto _get_counter(Entry& entry) ->
auto& {
419 return std::get<0>(std::get<1>(entry));
422 template <
typename Entry>
423 static inline auto _get_queue(Entry& entry) ->
auto& {
424 return std::get<1>(std::get<1>(entry));
427 blob_manipulator _blobs{*
this};
428 blob_manipulator::filter_function _allow_blob{};
430 auto _cleanup_blobs() -> bool;
431 auto _process_blobs() -> bool;
432 auto _do_allow_blob(message_id) -> bool;
440 auto _do_send(message_id msg_id, message_view) -> bool;
443 _handle_send(message_id msg_id,
message_age,
const message_view& message)
446 return _do_send(msg_id, message);
449 auto _handle_post(message_id msg_id,
const message_view& message) ->
bool {
450 return post(msg_id, message);
453 auto _handle_special(message_id msg_id,
const message_view&) noexcept
456 auto _store_message(message_id msg_id,
message_age,
const message_view&)
459 auto _accept_message(message_id msg_id,
const message_view&) -> bool;
463 , _store_handler{std::move(store_message)} {}
470 , _allow_blob{std::move(allow_blob)}
471 , _store_handler{std::move(store_message)} {}
475 , _context{std::move(temp._context)}
476 , _preconfd_id{std::exchange(temp._preconfd_id, invalid_id())}
477 , _endpoint_id{std::exchange(temp._endpoint_id, invalid_id())}
478 , _connection{std::move(temp._connection)}
479 , _outgoing{std::move(temp._outgoing)}
480 , _incoming{std::move(temp._incoming)}
481 , _blobs{std::move(temp._blobs)} {}
488 , _context{std::move(temp._context)}
489 , _preconfd_id{std::exchange(temp._preconfd_id, invalid_id())}
490 , _endpoint_id{std::exchange(temp._endpoint_id, invalid_id())}
491 , _connection{std::move(temp._connection)}
492 , _outgoing{std::move(temp._outgoing)}
493 , _incoming{std::move(temp._incoming)}
494 , _blobs{std::move(temp._blobs)}
495 , _allow_blob{std::move(allow_blob)}
496 , _store_handler{std::move(store_message)} {}
503 static auto _make_endpoint(
506 return endpoint{std::move(obj), store_message};
509 static auto _make_endpoint(
513 return endpoint{std::move(obj), allow_blob, store_message};
516 static auto _move_endpoint(
519 return endpoint{std::move(bus), {}, store_message};
522 static auto _move_endpoint(
526 return endpoint{std::move(bus), allow_blob, store_message};
529 inline auto _accept_message(
533 return ep._accept_message(msg_id,
message);
539 #if !EAGINE_LINK_LIBRARY || defined(EAGINE_IMPLEMENTING_LIBRARY)
540 #include <eagine/message_bus/endpoint.inl>
543 #endif // EAGINE_MESSAGE_BUS_ENDPOINT_HPP
auto push_if(Function function, span_size_t req_size=0) -> bool
Pushes a new message and lets a function to fill it.
Definition: message.hpp:475
void clear_allow_list()
Sends a message to router to clear its allow filter for this endpoint.
blob_manipulator::filter_function blob_filter_function
Alias for blob message type filter callable reference.
Definition: endpoint.hpp:47
Helper class used to initialize main context objects.
Definition: main_ctx_object.hpp:45
auto say_still_alive() -> bool
Posts a message saying that this endpoint is alive.
auto max_data_size() const -> valid_if_positive< span_size_t >
Returns the maximum data block size that the endpoint can send.
auto update() -> bool
Updates the internal state, sends and receives pending messages.
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
auto ctx() noexcept -> context &
Returns a reference to the message bus context.
Definition: endpoint.hpp:72
void query_subscriptions_of(identifier_t target_id)
Posts a message requesting all subscriptions of a target node.
Base class for main context objects.
Definition: main_ctx_object.hpp:71
auto preconfigure_id(identifier_t id) -> auto &
Preconfigures the unique id of this endpoint.
Definition: endpoint.hpp:93
auto get_preconfigured_id() const noexcept
Returns the preconfigured id of this endpoint.
Definition: endpoint.hpp:118
Structure storing information about a sigle message bus message.
Definition: message.hpp:119
void add_certificate_pem(memory::const_block blk)
Adds endpoint certificate in a PEM-encoded memory block.
static constexpr auto extract(api_result_value< Result, api_result_validity::never > &) noexcept -> Result &
Overload of extract for api_result_value.
Definition: c_api_wrap.hpp:270
auto has_preconfigured_id() const noexcept -> bool
Indicates if this endpoint has a preconfigured id (or should request one).
Definition: endpoint.hpp:102
@ info
Informational log entries.
Primary template for conditionally valid values.
Definition: decl.hpp:49
auto post_blob(message_id msg_id, identifier_t target_id, memory::const_block blob, std::chrono::seconds max_time, message_priority priority) -> bool
Enqueues a BLOB that is larger than max_data_size for sending.
Definition: endpoint.hpp:213
auto set_id(identifier id) -> auto &
Assigns the unique id of this endpoint.
Definition: endpoint.hpp:84
auto post_value(message_id msg_id, T &value, const message_info &info={}) -> bool
Serializes the specified value and enqueues it for sending in message.
Definition: endpoint.hpp:189
void post_meta_message_to(identifier_t target_id, message_id meta_msg_id, message_id msg_id)
Post a message with another message type as its content to target.
main_ctx_object(identifier obj_id, main_ctx_parent parent) noexcept
Initialization from object id and parent.
Definition: main_ctx_object.hpp:77
auto respond_to(const message_info &info, message_id msg_id, message_view message) -> bool
Posts a message as a response to another received message.
Definition: endpoint.hpp:348
void say_subscribes_to(message_id)
Broadcasts a message that this subscribes to message with given id.
auto get_id() const noexcept
Returns the unique id of this endpoint.
Definition: endpoint.hpp:126
void unsubscribe(message_id)
Unsubscribes from messages with the specified id/type.
void clear_block_list()
Sends a message to router to clear its block filter for this endpoint.
Class holding common message bus utility objects.
Definition: context.hpp:37
auto cfg_init(string_view key, T initial, string_view tag={}) -> T
Reads and returns the configuration value identified by key.
Definition: main_ctx_object.hpp:96
auto broadcast_blob(message_id msg_id, memory::const_block blob, std::chrono::seconds max_time, message_priority priority) -> bool
Enqueues a BLOB that is larger than max_data_size for broadcast.
Definition: endpoint.hpp:229
static constexpr auto broadcast_endpoint_id() noexcept -> identifier_t
Returns the special broadcase message bus endpoint id.
Definition: message.hpp:42
Interface for classes that can use message bus connections.
Definition: connection.hpp:155
void flush_outbox()
Sends any pending outgoing messages if possible.
callable_ref< bool(const message_context &, stored_message &)> method_handler
Alias for callable handling received messages.
Definition: endpoint.hpp:365
std::uint32_t process_instance_id_t
Unique process identifier type (does not necessarily match to OS PID).
Definition: identifier_t.hpp:22
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
auto process_all(message_id msg_id, method_handler handler) -> span_size_t
Processes all received messages of specified type with a handler.
Non-owning view of a contiguous range of memory with ValueType elements.
Definition: flatten_fwd.hpp:16
auto post_certificate(identifier_t target_id) -> bool
Posts the certificate of this enpoint to the specified remote.
void say_not_subscribed_to(identifier_t target_id, message_id)
Posts a message that this is not subscribed to message with given type.
auto process_one(message_id msg_id, member_function_constant< bool(Class::*)(const message_context &, stored_message &), MemFnPtr > method, Class *instance) -> bool
Processes a single received message of specified type with a method.
Definition: endpoint.hpp:378
auto process_everything(method_handler handler) -> span_size_t
Processes all received messages regardles of type with a handler.
auto post_signed(message_id, message_view message) -> bool
Signs and enqueues a message with the specified id/type for sending.
message_priority
Message priority enumeration.
Definition: message.hpp:58
endpoint(main_ctx_object obj) noexcept
Construction with a reference to parent main context object.
Definition: endpoint.hpp:50
void query_certificate_of(identifier_t endpoint_id)
Sends a message requesting remote endpoint certificate.
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
@ normal
Normal, default message priority.
#define EAGINE_THIS_MEM_FUNC_C(FUNC)
Macro for creating object of member_function_constant in member functions.
Definition: mem_func_const.hpp:206
void say_unsubscribes_from(message_id)
Broadcasts a message that this unsubscribes from message with given type.
void allow_message_type(message_id)
Sends a message to router to start blocking message type for this endpoint.
void subscribe(message_id)
Subscribes to messages with the specified id/type.
auto process_instance_id() const noexcept -> process_instance_id_t
Returns the process id.
auto operator=(endpoint &&)=delete
Not move assignable.
auto has_id() const noexcept -> bool
Indicates if this endpoint has valid id (set manually or from the bus).
Definition: endpoint.hpp:110
auto broadcast_blob(message_id msg_id, memory::const_block blob, std::chrono::seconds max_time) -> bool
Enqueues a BLOB that is larger than max_data_size for broadcast.
Definition: endpoint.hpp:243
@ message
Message protocol.
Message bus client endpoint that can send and receive messages.
Definition: endpoint.hpp:30
auto add_connection(std::unique_ptr< connection > conn) -> bool final
Adds a connection for communication with a message bus router.
callable_ref< bool(message_id, message_age, const message_view &)> fetch_handler
Alias for fetch handler callable reference type.
Definition: connection.hpp:118
void post_meta_message(message_id meta_msg_id, message_id msg_id)
Post a message with another message type as its content.
auto process_one(message_id msg_id, method_handler handler) -> bool
Processes a single received message of specified type with a handler.
void finish()
Says to the message bus that this endpoint is disconnecting.
Definition: endpoint.hpp:154
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
void push(message_id msg_id, const message_view &message)
Pushes a message into this storage.
Definition: message.hpp:464
void block_message_type(message_id)
Sends a message to router to start blocking message type for this endpoint.
auto respond_to(const message_info &info, message_id msg_id) -> bool
Posts a message as a response to another received message.
Definition: endpoint.hpp:357
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
void add_ca_certificate_pem(memory::const_block blk)
Adds CA certificate in a PEM-encoded memory block.
endpoint(identifier id, main_ctx_parent parent) noexcept
Construction with an enpoint id and parent main context object.
Definition: endpoint.hpp:54
Declaration of compile-time member function pointer wrappers.
Definition: mem_func_const.hpp:19
auto say_not_a_router() -> bool
Posts a message saying that this is not a router bus node.
Combines message information and an owned message content buffer.
Definition: message.hpp:316
auto is_usable() const -> bool
Tests if this has all prerequisites for sending and receiving messages.
auto say_bye() -> bool
Posts a message saying that this endpoint is about to disconnect.
void query_subscribers_of(message_id)
Posts a message requesting all subscribers of a given message type.
Base for classes that need access to enpoint internal functionality.
Definition: endpoint.hpp:501
auto post(message_id msg_id, message_view message) -> bool
Enqueues a message with the specified id/type for sending.
Definition: endpoint.hpp:171
std::chrono::duration< float > message_age
Alias for message age type.
Definition: message.hpp:54
static constexpr auto is_valid_id(identifier_t id) noexcept -> bool
Tests if the specified id is a valid endpoint id.
Definition: endpoint.hpp:39
auto broadcast_certificate() -> bool
Broadcasts the certificate of this enpoint to the whole bus.
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30
connection::fetch_handler fetch_handler
Alias for message fetch handler callable reference.
Definition: endpoint.hpp:44