OGLplus  (0.59.0) a C++ wrapper for rendering APIs

subscriber.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_SUBSCRIBER_HPP
10 #define EAGINE_MESSAGE_BUS_SUBSCRIBER_HPP
11 
12 #include "../interface.hpp"
13 #include "../maybe_unused.hpp"
14 #include "../span.hpp"
15 #include "endpoint.hpp"
16 #include "handler_map.hpp"
17 #include "verification.hpp"
18 #include <array>
19 #include <type_traits>
20 #include <vector>
21 
22 namespace eagine::msgbus {
23 //------------------------------------------------------------------------------
30 public:
32  explicit operator bool() noexcept {
33  return _endpoint != nullptr;
34  }
35 
38  auto bus() noexcept -> auto& {
39  EAGINE_ASSERT(_endpoint != nullptr);
40  return *_endpoint;
41  }
42 
45  auto bus() const noexcept -> auto& {
46  EAGINE_ASSERT(_endpoint != nullptr);
47  return *_endpoint;
48  }
49 
51  auto update() const noexcept -> bool {
52  if(EAGINE_LIKELY(_endpoint)) {
53  return _endpoint->update();
54  }
55  return false;
56  }
57 
59  auto verify_bits(const stored_message& message) noexcept
61  if(EAGINE_LIKELY(_endpoint)) {
62  return message.verify_bits(_endpoint->ctx(), *_endpoint);
63  }
64  return {};
65  }
66 
70  if(EAGINE_LIKELY(_endpoint)) {
71  _endpoint->query_subscriptions_of(target_id);
72  }
73  }
74 
78  if(EAGINE_LIKELY(_endpoint)) {
79  _endpoint->query_subscribers_of(sub_msg);
80  }
81  }
82 
84  subscriber_base(const subscriber_base&) = delete;
85 
87  auto operator=(subscriber_base&&) = delete;
88 
90  auto operator=(const subscriber_base&) = delete;
91 
92 protected:
93  using method_handler = typename endpoint::method_handler;
94  struct handler_entry {
95  message_id msg_id{};
96  method_handler handler{};
97 
98  constexpr handler_entry() noexcept = default;
99 
100  constexpr handler_entry(message_id id, method_handler hndlr) noexcept
101  : msg_id{std::move(id)}
102  , handler{std::move(hndlr)} {}
103 
104  template <
105  identifier_t ClassId,
106  identifier_t MethodId,
107  typename Class,
108  bool (Class::*HandlerFunc)(const message_context&, stored_message&)>
109  handler_entry(
110  Class* instance,
111  static_message_handler_map<
112  static_message_id<ClassId, MethodId>,
113  member_function_constant<
114  bool (Class::*)(const message_context&, stored_message&),
115  HandlerFunc>> msg_map) noexcept
116  : msg_id{ClassId, MethodId}
117  , handler{instance, msg_map.method()} {}
118 
119  template <
120  identifier_t ClassId,
121  identifier_t MethodId,
122  typename Class,
123  bool (Class::*HandlerFunc)(const message_context&, stored_message&)>
124  handler_entry(
125  Class* instance,
126  static_message_handler_map<
127  static_message_id<ClassId, MethodId>,
128  member_function_constant<
129  bool (Class::*)(const message_context&, stored_message&) const,
130  HandlerFunc>> msg_map) noexcept
131  : msg_id{ClassId, MethodId}
132  , handler{instance, msg_map.method()} {}
133  };
134 
135  ~subscriber_base() noexcept = default;
136  constexpr subscriber_base() noexcept = default;
137  constexpr subscriber_base(endpoint& bus) noexcept
138  : _endpoint{&bus} {}
139  subscriber_base(subscriber_base&& temp) noexcept
140  : _endpoint{temp._endpoint} {
141  temp._endpoint = nullptr;
142  }
143 
144  void _subscribe_to(span<const handler_entry> msg_handlers) const {
145  if(EAGINE_LIKELY(_endpoint)) {
146  for(auto& entry : msg_handlers) {
147  _endpoint->subscribe(entry.msg_id);
148  }
149  }
150  }
151 
152  void
153  _unsubscribe_from(span<const handler_entry> msg_handlers) const noexcept {
154  if(_endpoint) {
155  for(auto& entry : msg_handlers) {
156  try {
157  _endpoint->unsubscribe(entry.msg_id);
158  } catch(...) {
159  }
160  }
161  }
162  }
163 
164  void _announce_subscriptions(span<const handler_entry> msg_handlers) const {
165  if(EAGINE_LIKELY(_endpoint)) {
166  for(auto& entry : msg_handlers) {
167  _endpoint->say_subscribes_to(entry.msg_id);
168  }
169  }
170  }
171 
172  void _allow_subscriptions(span<const handler_entry> msg_handlers) const {
173  if(EAGINE_LIKELY(_endpoint)) {
174  for(auto& entry : msg_handlers) {
175  _endpoint->allow_message_type(entry.msg_id);
176  }
177  }
178  }
179 
180  void _retract_subscriptions(
181  span<const handler_entry> msg_handlers) const noexcept {
182  if(EAGINE_LIKELY(_endpoint)) {
183  for(auto& entry : msg_handlers) {
184  try {
185  _endpoint->say_unsubscribes_from(entry.msg_id);
186  } catch(...) {
187  }
188  }
189  }
190  }
191 
192  void _respond_to_subscription_query(
193  identifier_t source_id,
194  span<const handler_entry> msg_handlers) const {
195  if(EAGINE_LIKELY(_endpoint)) {
196  for(auto& entry : msg_handlers) {
197  _endpoint->say_subscribes_to(source_id, entry.msg_id);
198  }
199  }
200  }
201 
202  void _respond_to_subscription_query(
203  identifier_t source_id,
204  message_id sub_msg,
205  span<const handler_entry> msg_handlers) const {
206  if(EAGINE_LIKELY(_endpoint)) {
207  for(auto& entry : msg_handlers) {
208  if(entry.msg_id == sub_msg) {
209  _endpoint->say_subscribes_to(source_id, sub_msg);
210  return;
211  }
212  }
213  _endpoint->say_not_subscribed_to(source_id, sub_msg);
214  }
215  }
216 
217  auto _process_one(span<const handler_entry> msg_handlers) -> bool {
218  for(auto& entry : msg_handlers) {
219  if(bus().process_one(entry.msg_id, entry.handler)) {
220  return true;
221  }
222  }
223  return false;
224  }
225 
226  auto _process_all(span<const handler_entry> msg_handlers) -> span_size_t {
227  span_size_t result{0};
228  for(auto& entry : msg_handlers) {
229  result += bus().process_all(entry.msg_id, entry.handler);
230  }
231  return result;
232  }
233 
234  void _finish() noexcept {
235  if(EAGINE_LIKELY(_endpoint)) {
236  try {
237  _endpoint->finish();
238  } catch(...) {
239  }
240  }
241  }
242 
243 private:
244  endpoint* _endpoint{nullptr};
245 };
246 //------------------------------------------------------------------------------
250 template <std::size_t N>
252 public:
254  using method_handler = typename endpoint::method_handler;
255 
256  using handler_entry = typename subscriber_base::handler_entry;
257 
258  template <
259  typename... MsgHandlers,
260  typename = std::enable_if_t<sizeof...(MsgHandlers) == N>>
261  static_subscriber(endpoint& bus, MsgHandlers&&... msg_handlers)
262  : subscriber_base{bus}
263  , _msg_handlers{{std::forward<MsgHandlers>(msg_handlers)...}} {
264  this->_subscribe_to(view(_msg_handlers));
265  }
266 
271  template <
272  typename Class,
273  typename... MsgMaps,
274  typename = std::enable_if_t<sizeof...(MsgMaps) == N>>
275  static_subscriber(endpoint& bus, Class* instance, MsgMaps... msg_maps)
276  : static_subscriber(bus, handler_entry(instance, msg_maps)...) {}
277 
279  static_subscriber(static_subscriber&& temp) = delete;
280 
282  static_subscriber(const static_subscriber&) = delete;
283 
285  auto operator=(static_subscriber&&) = delete;
286 
288  auto operator=(const static_subscriber&) = delete;
289 
290  ~static_subscriber() noexcept {
291  this->_unsubscribe_from(view(_msg_handlers));
292  }
293 
295  auto process_one() -> bool {
296  return this->_process_one(view(_msg_handlers));
297  }
298 
301  return this->_process_all(view(_msg_handlers));
302  }
303 
308  void announce_subscriptions() const {
309  this->_announce_subscriptions(view(_msg_handlers));
310  }
311 
316  void allow_subscriptions() const {
317  this->_allow_subscriptions(view(_msg_handlers));
318  }
319 
324  void retract_subscriptions() const noexcept {
325  this->_retract_subscriptions(view(_msg_handlers));
326  }
327 
331  void respond_to_subscription_query(identifier_t source_id) const noexcept {
332  this->_respond_to_subscription_query(source_id, view(_msg_handlers));
333  }
334 
339  identifier_t source_id,
340  message_id sub_msg) const noexcept {
341  this->_respond_to_subscription_query(
342  source_id, sub_msg, view(_msg_handlers));
343  }
344 
345 private:
346  std::array<handler_entry, N> _msg_handlers;
347 };
348 //------------------------------------------------------------------------------
353  : public interface<subscriber>
354  , public subscriber_base {
355 public:
356  using handler_entry = subscriber_base::handler_entry;
357 
359  using method_handler =
360  callable_ref<bool(const message_context&, stored_message&)>;
361 
363  subscriber(endpoint& bus) noexcept
364  : subscriber_base{bus} {}
365 
367  template <
368  typename Class,
369  bool (Class::*Method)(const message_context&, stored_message&)>
371  Class* instance,
372  message_id msg_id,
374  bool (Class::*)(const message_context&, stored_message&),
375  Method> method) {
376  _msg_handlers.emplace_back(msg_id, method_handler{instance, method});
377  }
378 
380  template <
381  typename Class,
382  bool (Class::*Method)(const message_context&, stored_message&)>
384  Class* instance,
386  bool (Class::*)(const message_context&, stored_message&),
387  Method>> msg_map) noexcept {
388  add_method(instance, msg_map.msg_id(), msg_map.method());
389  }
390 
392  template <
393  typename Class,
394  bool (Class::*Method)(const message_context&, stored_message&)>
395  void add_method(std::tuple<
396  Class*,
398  bool (Class::*)(const message_context&, stored_message&),
399  Method>>> imm) noexcept {
400  add_method(
401  std::get<0>(imm),
402  std::get<1>(imm).msg_id(),
403  std::get<1>(imm).method());
404  }
405 
407  template <
408  typename Class,
409  bool (Class::*Method)(const message_context&, stored_message&),
410  identifier_t ClassId,
411  identifier_t MethodId>
413  Class* instance,
417  bool (Class::*)(const message_context&, stored_message&),
418  Method>> msg_map) noexcept {
419  add_method(instance, msg_map.msg_id(), msg_map.method());
420  }
421 
424  auto process_one() -> bool {
425  return this->_process_one(view(_msg_handlers));
426  }
427 
431  return this->_process_all(view(_msg_handlers));
432  }
433 
438  void announce_subscriptions() const {
439  this->_announce_subscriptions(view(_msg_handlers));
440  }
441 
446  void allow_subscriptions() const {
447  this->_allow_subscriptions(view(_msg_handlers));
448  }
449 
454  void retract_subscriptions() const noexcept {
455  this->_retract_subscriptions(view(_msg_handlers));
456  }
457 
461  void respond_to_subscription_query(identifier_t source_id) const noexcept {
462  this->_respond_to_subscription_query(source_id, view(_msg_handlers));
463  }
464 
469  identifier_t source_id,
470  message_id sub_msg) const noexcept {
471  this->_respond_to_subscription_query(
472  source_id, sub_msg, view(_msg_handlers));
473  }
474 
475 protected:
476  void add_methods() {}
477 
478  void init() {
479  this->_subscribe_to(view(_msg_handlers));
480  }
481 
482  void finish() noexcept {
483  this->_unsubscribe_from(view(_msg_handlers));
484  this->_finish();
485  }
486 
487 private:
488  std::vector<handler_entry> _msg_handlers;
489 };
490 //------------------------------------------------------------------------------
491 } // namespace eagine::msgbus
492 
493 #endif // EAGINE_MESSAGE_BUS_SUBSCRIBER_HPP
auto process_one() -> bool
Handles (and removes) one of poending received messages.
Definition: subscriber.hpp:424
Template with two identifier parameters representing class/method pair.
Definition: message_id.hpp:18
auto update() -> bool
Updates the internal state, sends and receives pending messages.
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
void announce_subscriptions() const
Sends messages to the bus saying which messages this can handle.
Definition: subscriber.hpp:438
auto ctx() noexcept -> context &
Returns a reference to the message bus context.
Definition: endpoint.hpp:72
void respond_to_subscription_query(identifier_t source_id) const noexcept
Sends messages responding to a subscription query.
Definition: subscriber.hpp:461
void query_subscriptions_of(identifier_t target_id)
Posts a message requesting all subscriptions of a target node.
void respond_to_subscription_query(identifier_t source_id, message_id sub_msg) const noexcept
Sends messages responding to a subscription query.
Definition: subscriber.hpp:338
void allow_subscriptions() const
Sends messages to the router saying which messages should be forwarded.
Definition: subscriber.hpp:316
void announce_subscriptions() const
Sends messages to the bus saying which messages this can handle.
Definition: subscriber.hpp:308
void query_subscriptions_of(identifier_t target_id)
Queries the subscriptions of the remote endpoint with the specified id.
Definition: subscriber.hpp:69
Template for subscribers with variable count of handled message types.
Definition: subscriber.hpp:352
Class for manipulating and testing a group of enumeration-based bits.
Definition: bitfield.hpp:19
auto update() const noexcept -> bool
Updates the internal endpoint state (should be called repeatedly).
Definition: subscriber.hpp:51
static constexpr auto view(T *addr, S size) noexcept -> const_span< T >
Creates a view starting at the specified pointer and specified length.
Definition: span.hpp:458
@ endpoint
Message bus client endpoint.
auto process_all() -> span_size_t
Processes all pending enqueued messages.
Definition: subscriber.hpp:300
void say_subscribes_to(message_id)
Broadcasts a message that this subscribes to message with given id.
void unsubscribe(message_id)
Unsubscribes from messages with the specified id/type.
void respond_to_subscription_query(identifier_t source_id) const noexcept
Sends messages responding to a subscription query.
Definition: subscriber.hpp:331
void retract_subscriptions() const noexcept
Sends messages to the bus saying which messages this cannot handle.
Definition: subscriber.hpp:324
callable_ref< bool(const message_context &, stored_message &)> method_handler
Alias for callable handling received messages.
Definition: endpoint.hpp:365
Template for subscribers with predefined count of handled message types.
Definition: subscriber.hpp:251
static_subscriber(endpoint &bus, Class *instance, MsgMaps... msg_maps)
Construction from a reference to endpoint and some message maps.
Definition: subscriber.hpp:275
Base class for message bus subscribers.
Definition: subscriber.hpp:29
auto verify_bits(const stored_message &message) noexcept -> verification_bits
Uses the associated endpoint to verify the specified message.
Definition: subscriber.hpp:59
void respond_to_subscription_query(identifier_t source_id, message_id sub_msg) const noexcept
Sends messages responding to a subscription query.
Definition: subscriber.hpp:468
void say_not_subscribed_to(identifier_t target_id, message_id)
Posts a message that this is not subscribed to message with given type.
auto bus() const noexcept -> auto &
Returns a const reference to the associated endpoint.
Definition: subscriber.hpp:45
Represents a mapping from a message type id to member function constant.
Definition: handler_map.hpp:26
@ source_id
The source has been verified.
void add_method(Class *instance, message_handler_map< member_function_constant< bool(Class::*)(const message_context &, stored_message &), Method >> msg_map) noexcept
Adds a handler for messages with the specified message id.
Definition: subscriber.hpp:383
void retract_subscriptions() const noexcept
Sends messages to the bus saying which messages this cannot handle.
Definition: subscriber.hpp:454
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
auto operator=(subscriber_base &&)=delete
Not move assignable.
void say_unsubscribes_from(message_id)
Broadcasts a message that this unsubscribes from message with given type.
void allow_message_type(message_id)
Sends a message to router to start blocking message type for this endpoint.
void subscribe(message_id)
Subscribes to messages with the specified id/type.
void add_method(Class *instance, static_message_handler_map< static_message_id< ClassId, MethodId >, member_function_constant< bool(Class::*)(const message_context &, stored_message &), Method >> msg_map) noexcept
Adds a handler for messages with the specified message id.
Definition: subscriber.hpp:412
Message bus client endpoint that can send and receive messages.
Definition: endpoint.hpp:30
auto process_all() -> span_size_t
Handles (and removes) all poending received messages.
Definition: subscriber.hpp:430
void allow_subscriptions() const
Sends messages to the router saying which messages should be forwarded.
Definition: subscriber.hpp:446
subscriber(endpoint &bus) noexcept
Construction from a reference to endpoint.
Definition: subscriber.hpp:363
void finish()
Says to the message bus that this endpoint is disconnecting.
Definition: endpoint.hpp:154
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
void add_method(Class *instance, message_id msg_id, member_function_constant< bool(Class::*)(const message_context &, stored_message &), Method > method)
Adds a handler for messages with the specified message id.
Definition: subscriber.hpp:370
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
auto process_one() -> bool
Processes one pending enqueued message.
Definition: subscriber.hpp:295
Declaration of compile-time member function pointer wrappers.
Definition: mem_func_const.hpp:19
auto bus() noexcept -> auto &
Returns a reference to the associated endpoint.
Definition: subscriber.hpp:38
Combines message information and an owned message content buffer.
Definition: message.hpp:316
subscriber_base(const subscriber_base &)=delete
Not copy assignable.
void add_method(std::tuple< Class *, message_handler_map< member_function_constant< bool(Class::*)(const message_context &, stored_message &), Method >>> imm) noexcept
Adds a handler for messages with the specified message id.
Definition: subscriber.hpp:395
Represents a mapping from a message type id to member function constant.
Definition: handler_map.hpp:52
void query_subscribers_of(message_id sub_msg)
Queries remote nodes subscribing to the specified message.
Definition: subscriber.hpp:77
void query_subscribers_of(message_id)
Posts a message requesting all subscribers of a given message type.

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).