9 #ifndef EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP
10 #define EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP
12 #include "../remote_node.hpp"
13 #include "../serialize.hpp"
14 #include "../subscriber.hpp"
25 template <
typename Base>
26 using node_tracker_base =
27 pinger<system_info_consumer<host_info_consumer<build_info_consumer<
28 endpoint_info_consumer<network_topology<subscriber_discovery<Base>>>>>>>;
30 template <
typename Base = subscriber>
31 class node_tracker :
public node_tracker_base<Base> {
33 using This = node_tracker;
34 using base = node_tracker_base<Base>;
45 some_true something_done{};
46 something_done(base::update());
48 if(_should_query_topology) {
49 this->discover_topology();
53 const bool should_query_info{_should_query_info};
55 _tracker.for_each_node_state([&](
auto node_id,
auto& node) {
56 if(should_query_info) {
58 this->query_host_id(node_id);
60 if(!node.has_endpoint_info()) {
61 this->query_endpoint_info(node_id);
63 if(!node.instance().build()) {
64 this->query_build_info(node_id);
66 if(node.is_responsive()) {
67 if(auto host = node.host_state()) {
69 this->query_hostname(node_id);
71 if(!host.cpu_concurrent_threads()) {
72 this->query_cpu_concurrent_threads(node_id);
74 if(!host.total_ram_size()) {
75 this->query_total_ram_size(node_id);
77 if(!host.total_swap_size()) {
78 this->query_total_swap_size(node_id);
80 const bool should_query_sensors =
81 host.should_query_sensors();
82 if(should_query_sensors) {
83 this->query_short_average_load(node_id);
84 this->query_long_average_load(node_id);
85 this->query_free_ram_size(node_id);
86 this->query_free_swap_size(node_id);
87 host.sensors_queried();
93 if(node.is_pingable()) {
94 const auto [should_ping, max_time] = node.should_ping();
96 this->ping(node_id, max_time);
101 _handle_node_change(node_id, node);
104 return something_done;
107 template <
typename Function>
108 void for_each_node(Function
function) {
109 _tracker.for_each_node(std::move(
function));
112 virtual void on_node_change(remote_node& node, remote_node_changes) = 0;
115 resetting_timeout _should_query_topology{std::chrono::seconds{15},
nothing};
116 resetting_timeout _should_query_info{std::chrono::seconds{10}};
118 remote_node_tracker _tracker{};
121 return _tracker.get_host(
id);
124 auto _get_instance(
identifier_t id) -> remote_instance_state& {
125 return _tracker.get_instance(
id);
129 return _tracker.get_node(
id);
133 -> node_connection_state& {
134 return _tracker.get_connection(id1, id2);
137 void _handle_node_change(
identifier_t node_id, remote_node_state& node) {
138 if(
const auto changes{node.changes()}) {
139 on_node_change(node, changes);
140 if(EAGINE_UNLIKELY(changes.new_instance())) {
141 this->query_endpoint_info(node_id);
142 this->query_host_id(node_id);
143 this->query_hostname(node_id);
144 this->query_subscriptions_of(node_id);
149 void is_alive(
const subscriber_info& info)
final {
150 _tracker.notice_instance(
info.endpoint_id,
info.instance_id);
153 void on_subscribed(
const subscriber_info& info, message_id msg_id)
final {
154 _tracker.notice_instance(
info.endpoint_id,
info.instance_id)
155 .add_subscription(msg_id);
158 void on_unsubscribed(
const subscriber_info& info, message_id msg_id)
final {
159 _tracker.notice_instance(
info.endpoint_id,
info.instance_id)
160 .remove_subscription(msg_id);
163 void not_subscribed(
const subscriber_info& info, message_id msg_id)
final {
164 _tracker.notice_instance(
info.endpoint_id,
info.instance_id)
165 .remove_subscription(msg_id);
168 void router_appeared(
const router_topology_info& info)
final {
169 _tracker.notice_instance(
info.router_id,
info.instance_id)
170 .assign(node_kind::router);
172 _get_connection(
info.router_id,
info.remote_id)
173 .set_kind(
info.connect_kind);
177 void bridge_appeared(
const bridge_topology_info& info)
final {
178 _tracker.notice_instance(
info.bridge_id,
info.instance_id)
179 .assign(node_kind::bridge);
180 if(
info.opposite_id) {
181 _get_connection(
info.bridge_id,
info.opposite_id)
182 .set_kind(connection_kind::remote_interprocess);
186 void endpoint_appeared(
const endpoint_topology_info& info)
final {
187 _tracker.notice_instance(
info.endpoint_id,
info.instance_id)
188 .assign(node_kind::endpoint);
191 void on_endpoint_info_received(
192 const result_context& ctx,
193 endpoint_info&& info)
final {
194 _get_node(ctx.source_id()).assign(std::move(info)).notice_alive();
197 void on_host_id_received(
198 const result_context& ctx,
199 valid_if_positive<host_id_t>&& host_id)
final {
201 _get_node(ctx.source_id())
207 void on_hostname_received(
208 const result_context& ctx,
209 valid_if_not_empty<std::string>&& hostname)
final {
211 auto& node = _get_node(ctx.source_id());
212 if(
auto host_id{node.host_id()}) {
213 auto& host = _get_host(
extract(host_id));
214 host.set_hostname(std::move(
extract(hostname))).notice_alive();
215 _tracker.for_each_host_node_state(
216 extract(host_id), [&](
auto,
auto& host_node) {
217 host_node.add_change(remote_node_change::host_info);
224 on_build_info_received(
const result_context& ctx, build_info&& info)
final {
225 auto& node = _get_node(ctx.source_id()).notice_alive();
226 if(
auto inst_id{node.instance_id()}) {
227 auto& inst = _get_instance(
extract(inst_id));
228 inst.assign(std::move(info));
229 _tracker.for_each_host_node_state(
230 extract(inst_id), [&](
auto,
auto& inst_node) {
231 inst_node.add_change(remote_node_change::build_info);
236 void on_cpu_concurrent_threads_received(
237 const result_context& ctx,
238 valid_if_positive<span_size_t>&& opt_value)
final {
240 auto& node = _get_node(ctx.source_id()).notice_alive();
241 if(
auto host_id{node.host_id()}) {
242 auto& host = _get_host(
extract(host_id)).notice_alive();
243 host.set_cpu_concurrent_threads(
extract(opt_value));
244 _tracker.for_each_host_node_state(
245 extract(host_id), [&](
auto,
auto& host_node) {
246 host_node.add_change(remote_node_change::hardware_config);
252 void on_short_average_load_received(
253 const result_context& ctx,
254 valid_if_nonnegative<float>&& opt_value)
final {
256 auto& node = _get_node(ctx.source_id()).notice_alive();
257 if(
auto host_id{node.host_id()}) {
258 auto& host = _get_host(
extract(host_id)).notice_alive();
259 host.set_short_average_load(
extract(opt_value));
260 _tracker.for_each_host_node_state(
261 extract(host_id), [&](
auto,
auto& host_node) {
262 host_node.add_change(remote_node_change::sensor_values);
268 void on_long_average_load_received(
269 const result_context& ctx,
270 valid_if_nonnegative<float>&& opt_value)
final {
272 auto& node = _get_node(ctx.source_id()).notice_alive();
273 if(
auto host_id{node.host_id()}) {
274 auto& host = _get_host(
extract(host_id)).notice_alive();
275 host.set_long_average_load(
extract(opt_value));
276 _tracker.for_each_host_node_state(
277 extract(host_id), [&](
auto,
auto& host_node) {
278 host_node.add_change(remote_node_change::sensor_values);
284 void on_free_ram_size_received(
285 const result_context& ctx,
286 valid_if_positive<span_size_t>&& opt_value)
final {
288 auto& node = _get_node(ctx.source_id()).notice_alive();
289 if(
auto host_id{node.host_id()}) {
290 auto& host = _get_host(
extract(host_id)).notice_alive();
291 host.set_free_ram_size(
extract(opt_value));
292 _tracker.for_each_host_node_state(
293 extract(host_id), [&](
auto,
auto& host_node) {
294 host_node.add_change(remote_node_change::sensor_values);
300 void on_total_ram_size_received(
301 const result_context& ctx,
302 valid_if_positive<span_size_t>&& opt_value)
final {
304 auto& node = _get_node(ctx.source_id()).notice_alive();
305 if(
auto host_id{node.host_id()}) {
306 auto& host = _get_host(
extract(host_id)).notice_alive();
307 host.set_total_ram_size(
extract(opt_value));
308 _tracker.for_each_host_node_state(
309 extract(host_id), [&](
auto,
auto& host_node) {
310 host_node.add_change(remote_node_change::hardware_config);
316 void on_free_swap_size_received(
317 const result_context& ctx,
318 valid_if_nonnegative<span_size_t>&& opt_value)
final {
320 auto& node = _get_node(ctx.source_id()).notice_alive();
321 if(
auto host_id{node.host_id()}) {
322 auto& host = _get_host(
extract(host_id)).notice_alive();
323 host.set_free_swap_size(
extract(opt_value));
324 _tracker.for_each_host_node_state(
325 extract(host_id), [&](
auto,
auto& host_node) {
326 host_node.add_change(remote_node_change::sensor_values);
332 void on_total_swap_size_received(
333 const result_context& ctx,
334 valid_if_nonnegative<span_size_t>&& opt_value)
final {
336 auto& node = _get_node(ctx.source_id()).notice_alive();
337 if(
auto host_id{node.host_id()}) {
338 auto& host = _get_host(
extract(host_id)).notice_alive();
339 host.set_total_swap_size(
extract(opt_value));
340 _tracker.for_each_host_node_state(
341 extract(host_id), [&](
auto,
auto& host_node) {
342 host_node.add_change(remote_node_change::hardware_config);
348 void on_ping_response(
351 std::chrono::microseconds age,
353 _get_node(node_id).ping_response(sequence_no, age);
356 void on_ping_timeout(
359 std::chrono::microseconds age)
final {
360 _get_node(node_id).ping_timeout(sequence_no, age);
366 #endif // EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP