OGLplus  (0.59.0) a C++ wrapper for rendering APIs

tracker.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP
10 #define EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP
11 
12 #include "../remote_node.hpp"
13 #include "../serialize.hpp"
14 #include "../subscriber.hpp"
15 #include "build_info.hpp"
16 #include "discovery.hpp"
17 #include "endpoint_info.hpp"
18 #include "host_info.hpp"
19 #include "ping_pong.hpp"
20 #include "system_info.hpp"
21 #include "topology.hpp"
22 
23 namespace eagine::msgbus {
24 //------------------------------------------------------------------------------
25 template <typename Base>
26 using node_tracker_base =
27  pinger<system_info_consumer<host_info_consumer<build_info_consumer<
28  endpoint_info_consumer<network_topology<subscriber_discovery<Base>>>>>>>;
29 //------------------------------------------------------------------------------
30 template <typename Base = subscriber>
31 class node_tracker : public node_tracker_base<Base> {
32 
33  using This = node_tracker;
34  using base = node_tracker_base<Base>;
35 
36 protected:
37  using base::base;
38 
39  void add_methods() {
40  base::add_methods();
41  }
42 
43 public:
44  auto update() -> bool {
45  some_true something_done{};
46  something_done(base::update());
47 
48  if(_should_query_topology) {
49  this->discover_topology();
50  something_done();
51  }
52 
53  const bool should_query_info{_should_query_info};
54 
55  _tracker.for_each_node_state([&](auto node_id, auto& node) {
56  if(should_query_info) {
57  if(!node.host_id()) {
58  this->query_host_id(node_id);
59  }
60  if(!node.has_endpoint_info()) {
61  this->query_endpoint_info(node_id);
62  }
63  if(!node.instance().build()) {
64  this->query_build_info(node_id);
65  }
66  if(node.is_responsive()) {
67  if(auto host = node.host_state()) {
68  if(!host.name()) {
69  this->query_hostname(node_id);
70  }
71  if(!host.cpu_concurrent_threads()) {
72  this->query_cpu_concurrent_threads(node_id);
73  }
74  if(!host.total_ram_size()) {
75  this->query_total_ram_size(node_id);
76  }
77  if(!host.total_swap_size()) {
78  this->query_total_swap_size(node_id);
79  }
80  const bool should_query_sensors =
81  host.should_query_sensors();
82  if(should_query_sensors) {
83  this->query_short_average_load(node_id);
84  this->query_long_average_load(node_id);
85  this->query_free_ram_size(node_id);
86  this->query_free_swap_size(node_id);
87  host.sensors_queried();
88  }
89  }
90  }
91  }
92 
93  if(node.is_pingable()) {
94  const auto [should_ping, max_time] = node.should_ping();
95  if(should_ping) {
96  this->ping(node_id, max_time);
97  node.pinged();
98  something_done();
99  }
100  }
101  _handle_node_change(node_id, node);
102  });
103 
104  return something_done;
105  }
106 
107  template <typename Function>
108  void for_each_node(Function function) {
109  _tracker.for_each_node(std::move(function));
110  }
111 
112  virtual void on_node_change(remote_node& node, remote_node_changes) = 0;
113 
114 private:
115  resetting_timeout _should_query_topology{std::chrono::seconds{15}, nothing};
116  resetting_timeout _should_query_info{std::chrono::seconds{10}};
117 
118  remote_node_tracker _tracker{};
119 
120  auto _get_host(identifier_t id) -> remote_host_state& {
121  return _tracker.get_host(id);
122  }
123 
124  auto _get_instance(identifier_t id) -> remote_instance_state& {
125  return _tracker.get_instance(id);
126  }
127 
128  auto _get_node(identifier_t id) -> remote_node_state& {
129  return _tracker.get_node(id);
130  }
131 
132  auto _get_connection(identifier_t id1, identifier_t id2)
133  -> node_connection_state& {
134  return _tracker.get_connection(id1, id2);
135  }
136 
137  void _handle_node_change(identifier_t node_id, remote_node_state& node) {
138  if(const auto changes{node.changes()}) {
139  on_node_change(node, changes);
140  if(EAGINE_UNLIKELY(changes.new_instance())) {
141  this->query_endpoint_info(node_id);
142  this->query_host_id(node_id);
143  this->query_hostname(node_id);
144  this->query_subscriptions_of(node_id);
145  }
146  }
147  }
148 
149  void is_alive(const subscriber_info& info) final {
150  _tracker.notice_instance(info.endpoint_id, info.instance_id);
151  }
152 
153  void on_subscribed(const subscriber_info& info, message_id msg_id) final {
154  _tracker.notice_instance(info.endpoint_id, info.instance_id)
155  .add_subscription(msg_id);
156  }
157 
158  void on_unsubscribed(const subscriber_info& info, message_id msg_id) final {
159  _tracker.notice_instance(info.endpoint_id, info.instance_id)
160  .remove_subscription(msg_id);
161  }
162 
163  void not_subscribed(const subscriber_info& info, message_id msg_id) final {
164  _tracker.notice_instance(info.endpoint_id, info.instance_id)
165  .remove_subscription(msg_id);
166  }
167 
168  void router_appeared(const router_topology_info& info) final {
169  _tracker.notice_instance(info.router_id, info.instance_id)
170  .assign(node_kind::router);
171  if(info.remote_id) {
172  _get_connection(info.router_id, info.remote_id)
173  .set_kind(info.connect_kind);
174  }
175  }
176 
177  void bridge_appeared(const bridge_topology_info& info) final {
178  _tracker.notice_instance(info.bridge_id, info.instance_id)
179  .assign(node_kind::bridge);
180  if(info.opposite_id) {
181  _get_connection(info.bridge_id, info.opposite_id)
182  .set_kind(connection_kind::remote_interprocess);
183  }
184  }
185 
186  void endpoint_appeared(const endpoint_topology_info& info) final {
187  _tracker.notice_instance(info.endpoint_id, info.instance_id)
188  .assign(node_kind::endpoint);
189  }
190 
191  void on_endpoint_info_received(
192  const result_context& ctx,
193  endpoint_info&& info) final {
194  _get_node(ctx.source_id()).assign(std::move(info)).notice_alive();
195  }
196 
197  void on_host_id_received(
198  const result_context& ctx,
199  valid_if_positive<host_id_t>&& host_id) final {
200  if(host_id) {
201  _get_node(ctx.source_id())
202  .set_host_id(extract(host_id))
203  .notice_alive();
204  }
205  }
206 
207  void on_hostname_received(
208  const result_context& ctx,
209  valid_if_not_empty<std::string>&& hostname) final {
210  if(hostname) {
211  auto& node = _get_node(ctx.source_id());
212  if(auto host_id{node.host_id()}) {
213  auto& host = _get_host(extract(host_id));
214  host.set_hostname(std::move(extract(hostname))).notice_alive();
215  _tracker.for_each_host_node_state(
216  extract(host_id), [&](auto, auto& host_node) {
217  host_node.add_change(remote_node_change::host_info);
218  });
219  }
220  }
221  }
222 
223  void
224  on_build_info_received(const result_context& ctx, build_info&& info) final {
225  auto& node = _get_node(ctx.source_id()).notice_alive();
226  if(auto inst_id{node.instance_id()}) {
227  auto& inst = _get_instance(extract(inst_id));
228  inst.assign(std::move(info));
229  _tracker.for_each_host_node_state(
230  extract(inst_id), [&](auto, auto& inst_node) {
231  inst_node.add_change(remote_node_change::build_info);
232  });
233  }
234  }
235 
236  void on_cpu_concurrent_threads_received(
237  const result_context& ctx,
238  valid_if_positive<span_size_t>&& opt_value) final {
239  if(opt_value) {
240  auto& node = _get_node(ctx.source_id()).notice_alive();
241  if(auto host_id{node.host_id()}) {
242  auto& host = _get_host(extract(host_id)).notice_alive();
243  host.set_cpu_concurrent_threads(extract(opt_value));
244  _tracker.for_each_host_node_state(
245  extract(host_id), [&](auto, auto& host_node) {
246  host_node.add_change(remote_node_change::hardware_config);
247  });
248  }
249  }
250  }
251 
252  void on_short_average_load_received(
253  const result_context& ctx,
254  valid_if_nonnegative<float>&& opt_value) final {
255  if(opt_value) {
256  auto& node = _get_node(ctx.source_id()).notice_alive();
257  if(auto host_id{node.host_id()}) {
258  auto& host = _get_host(extract(host_id)).notice_alive();
259  host.set_short_average_load(extract(opt_value));
260  _tracker.for_each_host_node_state(
261  extract(host_id), [&](auto, auto& host_node) {
262  host_node.add_change(remote_node_change::sensor_values);
263  });
264  }
265  }
266  }
267 
268  void on_long_average_load_received(
269  const result_context& ctx,
270  valid_if_nonnegative<float>&& opt_value) final {
271  if(opt_value) {
272  auto& node = _get_node(ctx.source_id()).notice_alive();
273  if(auto host_id{node.host_id()}) {
274  auto& host = _get_host(extract(host_id)).notice_alive();
275  host.set_long_average_load(extract(opt_value));
276  _tracker.for_each_host_node_state(
277  extract(host_id), [&](auto, auto& host_node) {
278  host_node.add_change(remote_node_change::sensor_values);
279  });
280  }
281  }
282  }
283 
284  void on_free_ram_size_received(
285  const result_context& ctx,
286  valid_if_positive<span_size_t>&& opt_value) final {
287  if(opt_value) {
288  auto& node = _get_node(ctx.source_id()).notice_alive();
289  if(auto host_id{node.host_id()}) {
290  auto& host = _get_host(extract(host_id)).notice_alive();
291  host.set_free_ram_size(extract(opt_value));
292  _tracker.for_each_host_node_state(
293  extract(host_id), [&](auto, auto& host_node) {
294  host_node.add_change(remote_node_change::sensor_values);
295  });
296  }
297  }
298  }
299 
300  void on_total_ram_size_received(
301  const result_context& ctx,
302  valid_if_positive<span_size_t>&& opt_value) final {
303  if(opt_value) {
304  auto& node = _get_node(ctx.source_id()).notice_alive();
305  if(auto host_id{node.host_id()}) {
306  auto& host = _get_host(extract(host_id)).notice_alive();
307  host.set_total_ram_size(extract(opt_value));
308  _tracker.for_each_host_node_state(
309  extract(host_id), [&](auto, auto& host_node) {
310  host_node.add_change(remote_node_change::hardware_config);
311  });
312  }
313  }
314  }
315 
316  void on_free_swap_size_received(
317  const result_context& ctx,
318  valid_if_nonnegative<span_size_t>&& opt_value) final {
319  if(opt_value) {
320  auto& node = _get_node(ctx.source_id()).notice_alive();
321  if(auto host_id{node.host_id()}) {
322  auto& host = _get_host(extract(host_id)).notice_alive();
323  host.set_free_swap_size(extract(opt_value));
324  _tracker.for_each_host_node_state(
325  extract(host_id), [&](auto, auto& host_node) {
326  host_node.add_change(remote_node_change::sensor_values);
327  });
328  }
329  }
330  }
331 
332  void on_total_swap_size_received(
333  const result_context& ctx,
334  valid_if_nonnegative<span_size_t>&& opt_value) final {
335  if(opt_value) {
336  auto& node = _get_node(ctx.source_id()).notice_alive();
337  if(auto host_id{node.host_id()}) {
338  auto& host = _get_host(extract(host_id)).notice_alive();
339  host.set_total_swap_size(extract(opt_value));
340  _tracker.for_each_host_node_state(
341  extract(host_id), [&](auto, auto& host_node) {
342  host_node.add_change(remote_node_change::hardware_config);
343  });
344  }
345  }
346  }
347 
348  void on_ping_response(
349  identifier_t node_id,
350  message_sequence_t sequence_no,
351  std::chrono::microseconds age,
352  verification_bits) final {
353  _get_node(node_id).ping_response(sequence_no, age);
354  }
355 
356  void on_ping_timeout(
357  identifier_t node_id,
358  message_sequence_t sequence_no,
359  std::chrono::microseconds age) final {
360  _get_node(node_id).ping_timeout(sequence_no, age);
361  }
362 };
363 //------------------------------------------------------------------------------
364 } // namespace eagine::msgbus
365 
366 #endif // EAGINE_MESSAGE_BUS_SERVICE_TRACKER_HPP
bitfield< verification_bit > verification_bits
Alias for a bus message verification bitfield.
Definition: verification.hpp:47
@ info
Informational log entries.
auto update() const noexcept -> bool
Updates the internal endpoint state (should be called repeatedly).
Definition: subscriber.hpp:51
std::uint32_t message_sequence_t
Alias for message sequence number type.
Definition: types.hpp:22
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
auto extract(const ok< Outcome > &x) noexcept -> const auto &
Overload of extract for instantiations of the ok template.
Definition: extract.hpp:193
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).