9 #ifndef EAGINE_MESSAGE_BUS_ROUTER_HPP
10 #define EAGINE_MESSAGE_BUS_ROUTER_HPP
12 #include "../flat_map.hpp"
13 #include "../main_ctx_object.hpp"
14 #include "../timeout.hpp"
15 #include "../valid_if/positive.hpp"
24 struct router_pending {
26 router_pending(std::unique_ptr<connection> a_connection)
27 : the_connection{std::move(a_connection)} {}
29 std::chrono::steady_clock::time_point create_time{
30 std::chrono::steady_clock::now()};
32 std::unique_ptr<connection> the_connection{};
35 return std::chrono::steady_clock::now() - create_time;
39 struct router_endpoint_info {
41 timeout is_outdated{std::chrono::seconds(60)};
42 std::vector<message_id> subscriptions{};
43 std::vector<message_id> unsubscriptions{};
45 void assign_instance_id(
const message_view& msg) {
47 if(instance_id != msg.sequence_no) {
49 subscriptions.clear();
50 unsubscriptions.clear();
56 std::unique_ptr<connection> the_connection{};
57 std::vector<message_id> message_block_list{};
58 std::vector<message_id> message_allow_list{};
59 bool maybe_router{
true};
60 bool do_disconnect{
false};
63 routed_node(routed_node&&) noexcept = default;
64 routed_node(const routed_node&) = delete;
65 auto operator=(routed_node&&) noexcept -> routed_node& = default;
66 auto operator=(const routed_node&) -> routed_node& = delete;
67 ~routed_node() noexcept = default;
69 void block_message(message_id);
70 void allow_message(message_id);
72 auto is_allowed(message_id) const noexcept ->
bool;
74 auto send(main_ctx_object&, message_id, const message_view&) const ->
bool;
77 struct parent_router {
78 std::unique_ptr<connection> the_connection{};
80 timeout confirm_id_timeout{std::chrono::seconds(2),
nothing};
82 void reset(std::unique_ptr<connection>);
84 auto update(main_ctx_object&,
identifier_t id_base) -> bool;
86 template <
typename Handler>
87 auto fetch_messages(main_ctx_object&,
const Handler&) -> bool;
89 auto send(main_ctx_object&, message_id,
const message_view&)
const -> bool;
93 :
public main_ctx_object
94 ,
public acceptor_user
95 ,
public connection_user {
98 : main_ctx_object{
EAGINE_ID(MsgBusRutr), parent}
99 , _context{make_context(*
this)} {
100 _setup_from_config();
102 using std::to_string;
104 "Router-" + to_string(_id_base),
105 "Message bus router id " + to_string(_id_base));
111 auto add_acceptor(std::shared_ptr<acceptor>) ->
bool final;
112 auto add_connection(std::unique_ptr<connection>) ->
bool final;
114 auto do_maintenance() -> bool;
115 auto do_work() -> bool;
117 auto update(
const valid_if_positive<int>& count) -> bool;
118 auto update() ->
bool {
124 auto no_connection_timeout() const noexcept -> auto& {
125 return _no_connection_timeout;
128 auto is_done() const noexcept ->
bool {
129 return bool(no_connection_timeout());
137 std::chrono::seconds max_time,
139 _blobs.push_outgoing(
140 msg_id, source_id, target_id, blob, max_time, priority);
144 void _setup_from_config();
146 auto _handle_accept() -> bool;
147 auto _handle_pending() -> bool;
148 auto _remove_timeouted() -> bool;
149 auto _is_disconnected(
identifier_t endpoint_id) -> bool;
150 auto _mark_disconnected(
identifier_t endpoint_id) -> void;
151 auto _remove_disconnected() -> bool;
152 void _assign_id(std::unique_ptr<connection>& conn);
153 void _handle_connection(std::unique_ptr<connection> conn);
155 auto _cleanup_blobs() -> bool;
156 auto _process_blobs() -> bool;
157 auto _do_allow_blob(message_id) -> bool;
158 auto _handle_blob(message_id,
message_age,
const message_view&) -> bool;
160 auto _update_endpoint_info(
identifier_t incoming_id,
const message_view&)
161 -> router_endpoint_info&;
163 auto _handle_special_common(
166 const message_view&) -> bool;
168 auto _handle_special(
171 const message_view&) -> bool;
173 auto _handle_special(
177 const message_view&) -> bool;
179 auto _do_route_message(
182 message_view& message) -> bool;
184 auto _route_messages() -> bool;
185 auto _update_connections() -> bool;
187 shared_context _context{};
188 const std::chrono::seconds _pending_timeout{30};
189 timeout _no_connection_timeout{std::chrono::seconds{30}};
194 std::chrono::steady_clock::time_point _forwarded_since{
195 std::chrono::steady_clock::now()};
196 std::intmax_t _forwarded_messages{0};
197 std::intmax_t _dropped_messages{0};
198 float _message_age_sum{0.F};
199 parent_router _parent_router;
200 std::vector<std::shared_ptr<acceptor>> _acceptors;
201 std::vector<router_pending> _pending;
202 flat_map<identifier_t, routed_node> _nodes;
203 flat_map<identifier_t, identifier_t> _endpoint_idx;
204 flat_map<identifier_t, router_endpoint_info> _endpoint_infos;
205 flat_map<identifier_t, timeout> _recently_disconnected;
206 blob_manipulator _blobs{*
this};
211 #if !EAGINE_LINK_LIBRARY || defined(EAGINE_IMPLEMENTING_LIBRARY)
212 #include <eagine/message_bus/router.inl>
215 #endif // EAGINE_MESSAGE_BUS_ROUTER_HPP