OGLplus  (0.59.0) a C++ wrapper for rendering APIs

router.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_ROUTER_HPP
10 #define EAGINE_MESSAGE_BUS_ROUTER_HPP
11 
12 #include "../flat_map.hpp"
13 #include "../main_ctx_object.hpp"
14 #include "../timeout.hpp"
15 #include "../valid_if/positive.hpp"
16 #include "acceptor.hpp"
17 #include "blobs.hpp"
18 #include "context_fwd.hpp"
19 #include <map>
20 #include <vector>
21 
22 namespace eagine::msgbus {
23 //------------------------------------------------------------------------------
24 struct router_pending {
25 
26  router_pending(std::unique_ptr<connection> a_connection)
27  : the_connection{std::move(a_connection)} {}
28 
29  std::chrono::steady_clock::time_point create_time{
30  std::chrono::steady_clock::now()};
31 
32  std::unique_ptr<connection> the_connection{};
33 
34  auto age() const {
35  return std::chrono::steady_clock::now() - create_time;
36  }
37 };
38 //------------------------------------------------------------------------------
39 struct router_endpoint_info {
41  timeout is_outdated{std::chrono::seconds(60)};
42  std::vector<message_id> subscriptions{};
43  std::vector<message_id> unsubscriptions{};
44 
45  void assign_instance_id(const message_view& msg) {
46  is_outdated.reset();
47  if(instance_id != msg.sequence_no) {
48  instance_id = msg.sequence_no;
49  subscriptions.clear();
50  unsubscriptions.clear();
51  }
52  }
53 };
54 //------------------------------------------------------------------------------
55 struct routed_node {
56  std::unique_ptr<connection> the_connection{};
57  std::vector<message_id> message_block_list{};
58  std::vector<message_id> message_allow_list{};
59  bool maybe_router{true};
60  bool do_disconnect{false};
61 
62  routed_node();
63  routed_node(routed_node&&) noexcept = default;
64  routed_node(const routed_node&) = delete;
65  auto operator=(routed_node&&) noexcept -> routed_node& = default;
66  auto operator=(const routed_node&) -> routed_node& = delete;
67  ~routed_node() noexcept = default;
68 
69  void block_message(message_id);
70  void allow_message(message_id);
71 
72  auto is_allowed(message_id) const noexcept -> bool;
73 
74  auto send(main_ctx_object&, message_id, const message_view&) const -> bool;
75 };
76 //------------------------------------------------------------------------------
77 struct parent_router {
78  std::unique_ptr<connection> the_connection{};
79  identifier_t confirmed_id{0};
80  timeout confirm_id_timeout{std::chrono::seconds(2), nothing};
81 
82  void reset(std::unique_ptr<connection>);
83 
84  auto update(main_ctx_object&, identifier_t id_base) -> bool;
85 
86  template <typename Handler>
87  auto fetch_messages(main_ctx_object&, const Handler&) -> bool;
88 
89  auto send(main_ctx_object&, message_id, const message_view&) const -> bool;
90 };
91 //------------------------------------------------------------------------------
92 class router
93  : public main_ctx_object
94  , public acceptor_user
95  , public connection_user {
96 public:
97  router(main_ctx_parent parent) noexcept
98  : main_ctx_object{EAGINE_ID(MsgBusRutr), parent}
99  , _context{make_context(*this)} {
100  _setup_from_config();
101 
102  using std::to_string;
103  object_description(
104  "Router-" + to_string(_id_base),
105  "Message bus router id " + to_string(_id_base));
106  }
107 
108  void add_certificate_pem(memory::const_block blk);
109  void add_ca_certificate_pem(memory::const_block blk);
110 
111  auto add_acceptor(std::shared_ptr<acceptor>) -> bool final;
112  auto add_connection(std::unique_ptr<connection>) -> bool final;
113 
114  auto do_maintenance() -> bool;
115  auto do_work() -> bool;
116 
117  auto update(const valid_if_positive<int>& count) -> bool;
118  auto update() -> bool {
119  return update(2);
120  }
121 
122  void cleanup();
123 
124  auto no_connection_timeout() const noexcept -> auto& {
125  return _no_connection_timeout;
126  }
127 
128  auto is_done() const noexcept -> bool {
129  return bool(no_connection_timeout());
130  }
131 
132  void post_blob(
133  message_id msg_id,
134  identifier_t source_id,
135  identifier_t target_id,
136  memory::const_block blob,
137  std::chrono::seconds max_time,
138  message_priority priority) {
139  _blobs.push_outgoing(
140  msg_id, source_id, target_id, blob, max_time, priority);
141  }
142 
143 private:
144  void _setup_from_config();
145 
146  auto _handle_accept() -> bool;
147  auto _handle_pending() -> bool;
148  auto _remove_timeouted() -> bool;
149  auto _is_disconnected(identifier_t endpoint_id) -> bool;
150  auto _mark_disconnected(identifier_t endpoint_id) -> void;
151  auto _remove_disconnected() -> bool;
152  void _assign_id(std::unique_ptr<connection>& conn);
153  void _handle_connection(std::unique_ptr<connection> conn);
154 
155  auto _cleanup_blobs() -> bool;
156  auto _process_blobs() -> bool;
157  auto _do_allow_blob(message_id) -> bool;
158  auto _handle_blob(message_id, message_age, const message_view&) -> bool;
159 
160  auto _update_endpoint_info(identifier_t incoming_id, const message_view&)
161  -> router_endpoint_info&;
162 
163  auto _handle_special_common(
164  message_id msg_id,
165  identifier_t incoming_id,
166  const message_view&) -> bool;
167 
168  auto _handle_special(
169  message_id msg_id,
170  identifier_t incoming_id,
171  const message_view&) -> bool;
172 
173  auto _handle_special(
174  message_id msg_id,
175  identifier_t incoming_id,
176  routed_node&,
177  const message_view&) -> bool;
178 
179  auto _do_route_message(
180  message_id msg_id,
181  identifier_t incoming_id,
182  message_view& message) -> bool;
183 
184  auto _route_messages() -> bool;
185  auto _update_connections() -> bool;
186 
187  shared_context _context{};
188  const std::chrono::seconds _pending_timeout{30};
189  timeout _no_connection_timeout{std::chrono::seconds{30}};
190  const process_instance_id_t _instance_id{process_instance_id()};
191  identifier_t _id_base{0};
192  identifier_t _id_end{0};
193  identifier_t _id_sequence{0};
194  std::chrono::steady_clock::time_point _forwarded_since{
195  std::chrono::steady_clock::now()};
196  std::intmax_t _forwarded_messages{0};
197  std::intmax_t _dropped_messages{0};
198  float _message_age_sum{0.F};
199  parent_router _parent_router;
200  std::vector<std::shared_ptr<acceptor>> _acceptors;
201  std::vector<router_pending> _pending;
202  flat_map<identifier_t, routed_node> _nodes;
203  flat_map<identifier_t, identifier_t> _endpoint_idx;
204  flat_map<identifier_t, router_endpoint_info> _endpoint_infos;
205  flat_map<identifier_t, timeout> _recently_disconnected;
206  blob_manipulator _blobs{*this};
207 };
208 //------------------------------------------------------------------------------
209 } // namespace eagine::msgbus
210 
211 #if !EAGINE_LINK_LIBRARY || defined(EAGINE_IMPLEMENTING_LIBRARY)
212 #include <eagine/message_bus/router.inl>
213 #endif
214 
215 #endif // EAGINE_MESSAGE_BUS_ROUTER_HPP
const main_ctx_object_parent_info & main_ctx_parent
Alias for main_ctx_object_parent_info parameter type.
Definition: main_ctx_fwd.hpp:24
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
basic_block< true > const_block
Alias for const byte memory span.
Definition: block.hpp:32
std::uint32_t process_instance_id_t
Unique process identifier type (does not necessarily match to OS PID).
Definition: identifier_t.hpp:22
message_priority
Message priority enumeration.
Definition: message.hpp:58
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
@ instance_id
The endpoint instance id has changed.
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
@ router
Message bus router.
std::chrono::duration< float > message_age
Alias for message age type.
Definition: message.hpp:54
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).