OGLplus  (0.59.0) a C++ wrapper for rendering APIs

endpoint.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_ENDPOINT_HPP
10 #define EAGINE_MESSAGE_BUS_ENDPOINT_HPP
11 
12 #include "../application_config.hpp"
13 #include "../flat_map.hpp"
14 #include "../main_ctx_object.hpp"
15 #include "../timeout.hpp"
16 #include "blobs.hpp"
17 #include "connection.hpp"
18 #include "context_fwd.hpp"
19 #include "serialize.hpp"
20 #include <tuple>
21 
22 namespace eagine::msgbus {
23 //------------------------------------------------------------------------------
24 class friend_of_endpoint;
25 //------------------------------------------------------------------------------
30 class endpoint
31  : public connection_user
32  , public main_ctx_object {
33 public:
34  static constexpr auto invalid_id() noexcept -> identifier_t {
35  return 0U;
36  }
37 
39  static constexpr auto is_valid_id(identifier_t id) noexcept -> bool {
40  return id != invalid_id();
41  }
42 
45 
48 
50  endpoint(main_ctx_object obj) noexcept
51  : main_ctx_object{std::move(obj)} {}
52 
54  endpoint(identifier id, main_ctx_parent parent) noexcept
55  : main_ctx_object{id, parent} {}
56 
57  explicit endpoint(
58  main_ctx_object obj,
59  blob_filter_function allow_blob) noexcept
60  : main_ctx_object{std::move(obj)}
61  , _allow_blob{std::move(allow_blob)} {}
62 
64  endpoint(const endpoint&) = delete;
66  auto operator=(endpoint&&) = delete;
68  auto operator=(const endpoint&) = delete;
69 
72  auto ctx() noexcept -> context& {
73  EAGINE_ASSERT(_context);
74  return *_context;
75  }
76 
77  ~endpoint() noexcept override = default;
78 
84  auto set_id(identifier id) -> auto& {
85  _endpoint_id = id.value();
86  return *this;
87  }
88 
93  auto preconfigure_id(identifier_t id) -> auto& {
94  _preconfd_id = id;
95  return *this;
96  }
97 
102  auto has_preconfigured_id() const noexcept -> bool {
103  return is_valid_id(_preconfd_id);
104  }
105 
110  auto has_id() const noexcept -> bool {
111  return is_valid_id(_endpoint_id);
112  }
113 
118  auto get_preconfigured_id() const noexcept {
119  return _preconfd_id;
120  }
121 
126  auto get_id() const noexcept {
127  return _endpoint_id;
128  }
129 
133 
137 
139  auto add_connection(std::unique_ptr<connection> conn) -> bool final;
140 
142  auto is_usable() const -> bool;
143 
146 
148  void flush_outbox();
149 
151  auto update() -> bool;
152 
154  void finish() {
155  say_bye();
156  flush_outbox();
157  }
158 
160  void subscribe(message_id);
161 
163  void unsubscribe(message_id);
164 
165  auto set_next_sequence_id(message_id, message_info&) -> bool;
166 
171  auto post(message_id msg_id, message_view message) -> bool {
172  if(EAGINE_LIKELY(has_id())) {
173  return _do_send(msg_id, message);
174  }
175  _outgoing.push(msg_id, message);
176  return true;
177  }
178 
182  auto post_signed(message_id, message_view message) -> bool;
183 
188  template <typename T>
189  auto post_value(message_id msg_id, T& value, const message_info& info = {})
190  -> bool {
191  if(const auto opt_size = max_data_size()) {
192  const auto max_size = extract(opt_size);
193  return _outgoing.push_if(
194  [this, msg_id, &info, &value, max_size](
195  message_id& dst_msg_id, stored_message& message) {
196  if(message.store_value(value, max_size)) {
197  message.assign(info);
198  dst_msg_id = msg_id;
199  return true;
200  }
201  return false;
202  },
203  max_size);
204  }
205  return false;
206  }
207 
213  auto post_blob(
214  message_id msg_id,
215  identifier_t target_id,
216  memory::const_block blob,
217  std::chrono::seconds max_time,
218  message_priority priority) -> bool {
219  _blobs.push_outgoing(
220  msg_id, _endpoint_id, target_id, blob, max_time, priority);
221  return true;
222  }
223 
230  message_id msg_id,
231  memory::const_block blob,
232  std::chrono::seconds max_time,
233  message_priority priority) -> bool {
234  return post_blob(
235  msg_id, broadcast_endpoint_id(), blob, max_time, priority);
236  }
237 
244  message_id msg_id,
245  memory::const_block blob,
246  std::chrono::seconds max_time) -> bool {
247  return broadcast_blob(msg_id, blob, max_time, message_priority::normal);
248  }
249 
251  auto post_certificate(identifier_t target_id) -> bool;
252 
254  auto broadcast_certificate() -> bool;
255 
256  auto broadcast(message_id msg_id) -> bool {
257  return post(msg_id, {});
258  }
259 
262  auto say_not_a_router() -> bool;
263 
267  auto say_still_alive() -> bool;
268 
272  auto say_bye() -> bool;
273 
278  void post_meta_message(message_id meta_msg_id, message_id msg_id);
279 
285  identifier_t target_id,
286  message_id meta_msg_id,
287  message_id msg_id);
288 
293  void say_subscribes_to(message_id);
294 
299  void say_subscribes_to(identifier_t target_id, message_id);
300 
305  void say_unsubscribes_from(message_id);
306 
311  void say_not_subscribed_to(identifier_t target_id, message_id);
312 
316  void query_subscriptions_of(identifier_t target_id);
317 
321  void query_subscribers_of(message_id);
322 
326  void clear_block_list();
327 
331  void block_message_type(message_id);
332 
336  void clear_allow_list();
337 
341  void allow_message_type(message_id);
342 
344  void query_certificate_of(identifier_t endpoint_id);
345 
349  const message_info& info,
350  message_id msg_id,
351  message_view message) -> bool {
352  message.setup_response(info);
353  return post(msg_id, message);
354  }
355 
357  auto respond_to(const message_info& info, message_id msg_id) -> bool {
358  return respond_to(info, msg_id, {});
359  }
360 
364  using method_handler =
365  callable_ref<bool(const message_context&, stored_message&)>;
366 
370  auto process_one(message_id msg_id, method_handler handler) -> bool;
371 
375  template <
376  typename Class,
377  bool (Class::*MemFnPtr)(const message_context&, stored_message&)>
379  message_id msg_id,
381  bool (Class::*)(const message_context&, stored_message&),
382  MemFnPtr> method,
383  Class* instance) -> bool {
384  return process_one(msg_id, {instance, method});
385  }
386 
390  auto process_all(message_id msg_id, method_handler handler) -> span_size_t;
391 
394 
395 private:
396  friend class friend_of_endpoint;
397 
398  shared_context _context{make_context(*this)};
399 
400  const process_instance_id_t _instance_id{process_instance_id()};
401  identifier_t _preconfd_id{invalid_id()};
402  identifier_t _endpoint_id{invalid_id()};
403  timeout _no_id_timeout{
404  cfg_init("msg_bus.endpoint.no_id_timeout", std::chrono::seconds{2}),
405  nothing};
406  resetting_timeout _should_notify_alive{
407  cfg_init("msg_bus.endpoint.alive_notify_period", std::chrono::seconds{30}),
408  nothing};
409 
410  std::unique_ptr<connection> _connection{};
411 
412  message_storage _outgoing{};
413 
414  flat_map<message_id, std::tuple<span_size_t, message_priority_queue>>
415  _incoming{};
416 
417  template <typename Entry>
418  static inline auto _get_counter(Entry& entry) -> auto& {
419  return std::get<0>(std::get<1>(entry));
420  }
421 
422  template <typename Entry>
423  static inline auto _get_queue(Entry& entry) -> auto& {
424  return std::get<1>(std::get<1>(entry));
425  }
426 
427  blob_manipulator _blobs{*this};
428  blob_manipulator::filter_function _allow_blob{};
429 
430  auto _cleanup_blobs() -> bool;
431  auto _process_blobs() -> bool;
432  auto _do_allow_blob(message_id) -> bool;
433 
434  auto _default_store_handler() noexcept -> fetch_handler {
435  return fetch_handler{this, EAGINE_THIS_MEM_FUNC_C(_store_message)};
436  }
437 
438  fetch_handler _store_handler{_default_store_handler()};
439 
440  auto _do_send(message_id msg_id, message_view) -> bool;
441 
442  auto
443  _handle_send(message_id msg_id, message_age, const message_view& message)
444  -> bool {
445  // TODO: use message age
446  return _do_send(msg_id, message);
447  }
448 
449  auto _handle_post(message_id msg_id, const message_view& message) -> bool {
450  return post(msg_id, message);
451  }
452 
453  auto _handle_special(message_id msg_id, const message_view&) noexcept
454  -> bool;
455 
456  auto _store_message(message_id msg_id, message_age, const message_view&)
457  -> bool;
458 
459  auto _accept_message(message_id msg_id, const message_view&) -> bool;
460 
461  explicit endpoint(main_ctx_object obj, fetch_handler store_message) noexcept
462  : main_ctx_object{std::move(obj)}
463  , _store_handler{std::move(store_message)} {}
464 
465  explicit endpoint(
466  main_ctx_object obj,
467  blob_filter_function allow_blob,
468  fetch_handler store_message) noexcept
469  : main_ctx_object{std::move(obj)}
470  , _allow_blob{std::move(allow_blob)}
471  , _store_handler{std::move(store_message)} {}
472 
473  endpoint(endpoint&& temp) noexcept
474  : main_ctx_object{static_cast<main_ctx_object&&>(temp)}
475  , _context{std::move(temp._context)}
476  , _preconfd_id{std::exchange(temp._preconfd_id, invalid_id())}
477  , _endpoint_id{std::exchange(temp._endpoint_id, invalid_id())}
478  , _connection{std::move(temp._connection)}
479  , _outgoing{std::move(temp._outgoing)}
480  , _incoming{std::move(temp._incoming)}
481  , _blobs{std::move(temp._blobs)} {}
482 
483  endpoint(
484  endpoint&& temp,
485  blob_filter_function allow_blob,
486  fetch_handler store_message) noexcept
487  : main_ctx_object{static_cast<main_ctx_object&&>(temp)}
488  , _context{std::move(temp._context)}
489  , _preconfd_id{std::exchange(temp._preconfd_id, invalid_id())}
490  , _endpoint_id{std::exchange(temp._endpoint_id, invalid_id())}
491  , _connection{std::move(temp._connection)}
492  , _outgoing{std::move(temp._outgoing)}
493  , _incoming{std::move(temp._incoming)}
494  , _blobs{std::move(temp._blobs)}
495  , _allow_blob{std::move(allow_blob)}
496  , _store_handler{std::move(store_message)} {}
497 };
498 //------------------------------------------------------------------------------
502 protected:
503  static auto _make_endpoint(
504  main_ctx_object obj,
505  endpoint::fetch_handler store_message) noexcept {
506  return endpoint{std::move(obj), store_message};
507  }
508 
509  static auto _make_endpoint(
510  main_ctx_object obj,
512  endpoint::fetch_handler store_message) noexcept {
513  return endpoint{std::move(obj), allow_blob, store_message};
514  }
515 
516  static auto _move_endpoint(
517  endpoint&& bus,
518  endpoint::fetch_handler store_message) noexcept {
519  return endpoint{std::move(bus), {}, store_message};
520  }
521 
522  static auto _move_endpoint(
523  endpoint&& bus,
525  endpoint::fetch_handler store_message) noexcept {
526  return endpoint{std::move(bus), allow_blob, store_message};
527  }
528 
529  inline auto _accept_message(
530  endpoint& ep,
531  message_id msg_id,
532  const message_view& message) -> bool {
533  return ep._accept_message(msg_id, message);
534  }
535 };
536 //------------------------------------------------------------------------------
537 } // namespace eagine::msgbus
538 
539 #if !EAGINE_LINK_LIBRARY || defined(EAGINE_IMPLEMENTING_LIBRARY)
540 #include <eagine/message_bus/endpoint.inl>
541 #endif
542 
543 #endif // EAGINE_MESSAGE_BUS_ENDPOINT_HPP
auto push_if(Function function, span_size_t req_size=0) -> bool
Pushes a new message and lets a function to fill it.
Definition: message.hpp:475
void clear_allow_list()
Sends a message to router to clear its allow filter for this endpoint.
blob_manipulator::filter_function blob_filter_function
Alias for blob message type filter callable reference.
Definition: endpoint.hpp:47
Helper class used to initialize main context objects.
Definition: main_ctx_object.hpp:45
auto say_still_alive() -> bool
Posts a message saying that this endpoint is alive.
auto max_data_size() const -> valid_if_positive< span_size_t >
Returns the maximum data block size that the endpoint can send.
auto update() -> bool
Updates the internal state, sends and receives pending messages.
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
auto ctx() noexcept -> context &
Returns a reference to the message bus context.
Definition: endpoint.hpp:72
void query_subscriptions_of(identifier_t target_id)
Posts a message requesting all subscriptions of a target node.
Base class for main context objects.
Definition: main_ctx_object.hpp:71
auto preconfigure_id(identifier_t id) -> auto &
Preconfigures the unique id of this endpoint.
Definition: endpoint.hpp:93
auto get_preconfigured_id() const noexcept
Returns the preconfigured id of this endpoint.
Definition: endpoint.hpp:118
Structure storing information about a sigle message bus message.
Definition: message.hpp:119
void add_certificate_pem(memory::const_block blk)
Adds endpoint certificate in a PEM-encoded memory block.
static constexpr auto extract(api_result_value< Result, api_result_validity::never > &) noexcept -> Result &
Overload of extract for api_result_value.
Definition: c_api_wrap.hpp:270
auto has_preconfigured_id() const noexcept -> bool
Indicates if this endpoint has a preconfigured id (or should request one).
Definition: endpoint.hpp:102
@ info
Informational log entries.
Primary template for conditionally valid values.
Definition: decl.hpp:49
auto post_blob(message_id msg_id, identifier_t target_id, memory::const_block blob, std::chrono::seconds max_time, message_priority priority) -> bool
Enqueues a BLOB that is larger than max_data_size for sending.
Definition: endpoint.hpp:213
auto set_id(identifier id) -> auto &
Assigns the unique id of this endpoint.
Definition: endpoint.hpp:84
auto post_value(message_id msg_id, T &value, const message_info &info={}) -> bool
Serializes the specified value and enqueues it for sending in message.
Definition: endpoint.hpp:189
void post_meta_message_to(identifier_t target_id, message_id meta_msg_id, message_id msg_id)
Post a message with another message type as its content to target.
main_ctx_object(identifier obj_id, main_ctx_parent parent) noexcept
Initialization from object id and parent.
Definition: main_ctx_object.hpp:77
auto respond_to(const message_info &info, message_id msg_id, message_view message) -> bool
Posts a message as a response to another received message.
Definition: endpoint.hpp:348
void say_subscribes_to(message_id)
Broadcasts a message that this subscribes to message with given id.
auto get_id() const noexcept
Returns the unique id of this endpoint.
Definition: endpoint.hpp:126
void unsubscribe(message_id)
Unsubscribes from messages with the specified id/type.
void clear_block_list()
Sends a message to router to clear its block filter for this endpoint.
Class holding common message bus utility objects.
Definition: context.hpp:37
auto cfg_init(string_view key, T initial, string_view tag={}) -> T
Reads and returns the configuration value identified by key.
Definition: main_ctx_object.hpp:96
auto broadcast_blob(message_id msg_id, memory::const_block blob, std::chrono::seconds max_time, message_priority priority) -> bool
Enqueues a BLOB that is larger than max_data_size for broadcast.
Definition: endpoint.hpp:229
static constexpr auto broadcast_endpoint_id() noexcept -> identifier_t
Returns the special broadcase message bus endpoint id.
Definition: message.hpp:42
Interface for classes that can use message bus connections.
Definition: connection.hpp:155
void flush_outbox()
Sends any pending outgoing messages if possible.
callable_ref< bool(const message_context &, stored_message &)> method_handler
Alias for callable handling received messages.
Definition: endpoint.hpp:365
std::uint32_t process_instance_id_t
Unique process identifier type (does not necessarily match to OS PID).
Definition: identifier_t.hpp:22
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
auto process_all(message_id msg_id, method_handler handler) -> span_size_t
Processes all received messages of specified type with a handler.
Non-owning view of a contiguous range of memory with ValueType elements.
Definition: flatten_fwd.hpp:16
auto post_certificate(identifier_t target_id) -> bool
Posts the certificate of this enpoint to the specified remote.
void say_not_subscribed_to(identifier_t target_id, message_id)
Posts a message that this is not subscribed to message with given type.
auto process_one(message_id msg_id, member_function_constant< bool(Class::*)(const message_context &, stored_message &), MemFnPtr > method, Class *instance) -> bool
Processes a single received message of specified type with a method.
Definition: endpoint.hpp:378
auto process_everything(method_handler handler) -> span_size_t
Processes all received messages regardles of type with a handler.
auto post_signed(message_id, message_view message) -> bool
Signs and enqueues a message with the specified id/type for sending.
message_priority
Message priority enumeration.
Definition: message.hpp:58
endpoint(main_ctx_object obj) noexcept
Construction with a reference to parent main context object.
Definition: endpoint.hpp:50
void query_certificate_of(identifier_t endpoint_id)
Sends a message requesting remote endpoint certificate.
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
@ normal
Normal, default message priority.
#define EAGINE_THIS_MEM_FUNC_C(FUNC)
Macro for creating object of member_function_constant in member functions.
Definition: mem_func_const.hpp:206
void say_unsubscribes_from(message_id)
Broadcasts a message that this unsubscribes from message with given type.
void allow_message_type(message_id)
Sends a message to router to start blocking message type for this endpoint.
void subscribe(message_id)
Subscribes to messages with the specified id/type.
auto process_instance_id() const noexcept -> process_instance_id_t
Returns the process id.
auto operator=(endpoint &&)=delete
Not move assignable.
auto has_id() const noexcept -> bool
Indicates if this endpoint has valid id (set manually or from the bus).
Definition: endpoint.hpp:110
auto broadcast_blob(message_id msg_id, memory::const_block blob, std::chrono::seconds max_time) -> bool
Enqueues a BLOB that is larger than max_data_size for broadcast.
Definition: endpoint.hpp:243
Message bus client endpoint that can send and receive messages.
Definition: endpoint.hpp:30
auto add_connection(std::unique_ptr< connection > conn) -> bool final
Adds a connection for communication with a message bus router.
callable_ref< bool(message_id, message_age, const message_view &)> fetch_handler
Alias for fetch handler callable reference type.
Definition: connection.hpp:118
void post_meta_message(message_id meta_msg_id, message_id msg_id)
Post a message with another message type as its content.
auto process_one(message_id msg_id, method_handler handler) -> bool
Processes a single received message of specified type with a handler.
void finish()
Says to the message bus that this endpoint is disconnecting.
Definition: endpoint.hpp:154
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
void push(message_id msg_id, const message_view &message)
Pushes a message into this storage.
Definition: message.hpp:464
void block_message_type(message_id)
Sends a message to router to start blocking message type for this endpoint.
auto respond_to(const message_info &info, message_id msg_id) -> bool
Posts a message as a response to another received message.
Definition: endpoint.hpp:357
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
void add_ca_certificate_pem(memory::const_block blk)
Adds CA certificate in a PEM-encoded memory block.
endpoint(identifier id, main_ctx_parent parent) noexcept
Construction with an enpoint id and parent main context object.
Definition: endpoint.hpp:54
Declaration of compile-time member function pointer wrappers.
Definition: mem_func_const.hpp:19
auto say_not_a_router() -> bool
Posts a message saying that this is not a router bus node.
Combines message information and an owned message content buffer.
Definition: message.hpp:316
auto is_usable() const -> bool
Tests if this has all prerequisites for sending and receiving messages.
auto say_bye() -> bool
Posts a message saying that this endpoint is about to disconnect.
void query_subscribers_of(message_id)
Posts a message requesting all subscribers of a given message type.
Base for classes that need access to enpoint internal functionality.
Definition: endpoint.hpp:501
auto post(message_id msg_id, message_view message) -> bool
Enqueues a message with the specified id/type for sending.
Definition: endpoint.hpp:171
std::chrono::duration< float > message_age
Alias for message age type.
Definition: message.hpp:54
static constexpr auto is_valid_id(identifier_t id) noexcept -> bool
Tests if the specified id is a valid endpoint id.
Definition: endpoint.hpp:39
auto broadcast_certificate() -> bool
Broadcasts the certificate of this enpoint to the whole bus.
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30
connection::fetch_handler fetch_handler
Alias for message fetch handler callable reference.
Definition: endpoint.hpp:44

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).