OGLplus  (0.59.0) a C++ wrapper for rendering APIs

direct.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_DIRECT_HPP
10 #define EAGINE_MESSAGE_BUS_DIRECT_HPP
11 
12 #include "../bool_aggregate.hpp"
13 #include "../branch_predict.hpp"
14 #include "../main_ctx_object.hpp"
15 #include "../value_tracker.hpp"
16 #include "conn_factory.hpp"
17 #include <map>
18 #include <mutex>
19 
20 namespace eagine::msgbus {
21 //------------------------------------------------------------------------------
28 public:
31  : main_ctx_object{EAGINE_ID(DrctConnSt), parent} {}
32 
34  void send_to_server(message_id msg_id, const message_view& message) {
35  std::unique_lock lock{_c2s_mutex};
36  _client_to_server.push(msg_id, message);
37  }
38 
40  void send_to_client(message_id msg_id, const message_view& message) {
41  std::unique_lock lock{_s2c_mutex};
42  _server_to_client.push(msg_id, message);
43  }
44 
46  auto fetch_from_client(connection::fetch_handler handler) noexcept -> bool {
47  std::unique_lock lock{_c2s_mutex};
48  return _client_to_server.fetch_all(handler);
49  }
50 
52  auto fetch_from_server(connection::fetch_handler handler) noexcept -> bool {
53  std::unique_lock lock{_s2c_mutex};
54  return _server_to_client.fetch_all(handler);
55  }
56 
57  void log_message_counts() noexcept {
58  if constexpr(is_log_level_enabled_v<log_event_severity::stat>) {
59  {
60  std::unique_lock lock{_s2c_mutex};
61  if(_s2c_count.has_changed(_server_to_client.count())) {
62  this->log_chart_sample(
63  EAGINE_ID(s2cMsgCnt), float(_s2c_count.get()));
64  }
65  }
66 
67  {
68  std::unique_lock lock{_c2s_mutex};
69  if(_c2s_count.has_changed(_client_to_server.count())) {
70  this->log_chart_sample(
71  EAGINE_ID(c2sMsgCnt), float(_c2s_count.get()));
72  }
73  }
74  }
75  }
76 
77 private:
78  std::mutex _s2c_mutex;
79  std::mutex _c2s_mutex;
80  message_storage _server_to_client;
81  message_storage _client_to_server;
82  value_change_div_tracker<span_size_t, 16> _s2c_count{0};
83  value_change_div_tracker<span_size_t, 16> _c2s_count{0};
84 };
85 //------------------------------------------------------------------------------
94 public:
96  using shared_state = std::shared_ptr<direct_connection_state>;
97 
101 
104  : main_ctx_object{EAGINE_ID(DrctConnAd), parent} {}
105 
108  auto connect() -> shared_state {
109  auto state{std::make_shared<direct_connection_state>(*this)};
110  _pending.push_back(state);
111  return state;
112  }
113 
116  auto process_all(process_handler handler) -> bool {
117  some_true something_done{};
118  for(auto& state : _pending) {
119  handler(state);
120  something_done();
121  }
122  _pending.clear();
123  return something_done;
124  }
125 
126 private:
127  std::vector<shared_state> _pending;
128 };
129 //------------------------------------------------------------------------------
134 template <typename Base>
135 class direct_connection_info : public Base {
136 public:
137  using Base::Base;
138 
139  auto kind() -> connection_kind final {
141  }
142 
143  auto addr_kind() -> connection_addr_kind final {
145  }
146 
147  auto type_id() -> identifier final {
148  return EAGINE_ID(Direct);
149  }
150 };
151 //------------------------------------------------------------------------------
158 public:
160  std::shared_ptr<direct_connection_address>& address) noexcept
161  : _weak_address{address}
162  , _state{address->connect()} {}
163 
164  auto is_usable() -> bool final {
165  _checkup();
166  return bool(_state);
167  }
168 
169  auto update() -> bool final {
170  if(EAGINE_LIKELY(_state)) {
171  _state->log_message_counts();
172  }
173  return false;
174  }
175 
176  auto send(message_id msg_id, const message_view& message) -> bool final {
177  _checkup();
178  if(EAGINE_LIKELY(_state)) {
179  _state->send_to_server(msg_id, message);
180  return true;
181  }
182  return false;
183  }
184 
185  auto fetch_messages(connection::fetch_handler handler) -> bool final {
186  _checkup();
187  if(EAGINE_LIKELY(_state)) {
188  return _state->fetch_from_server(handler);
189  }
190  return false;
191  }
192 
193  void cleanup() final {
194  if(EAGINE_LIKELY(_state)) {
195  _state->log_message_counts();
196  }
197  }
198 
199 private:
200  std::weak_ptr<direct_connection_address> _weak_address;
201  std::shared_ptr<direct_connection_state> _state;
202 
203  inline void _checkup() {
204  if(EAGINE_UNLIKELY(!_state)) {
205  if(auto address{_weak_address.lock()}) {
206  _state = address->connect();
207  }
208  }
209  }
210 };
211 //------------------------------------------------------------------------------
218 public:
219  direct_server_connection(std::shared_ptr<direct_connection_state>& state)
220  : _weak_state{state} {}
221 
222  auto send(message_id msg_id, const message_view& message) -> bool final {
223  if(auto state{_weak_state.lock()}) {
224  state->send_to_client(msg_id, message);
225  return true;
226  }
227  return false;
228  }
229 
230  auto fetch_messages(connection::fetch_handler handler) -> bool final {
231  if(auto state{_weak_state.lock()}) {
232  return state->fetch_from_client(handler);
233  }
234  return false;
235  }
236 
237 private:
238  std::weak_ptr<direct_connection_state> _weak_state;
239 };
240 //------------------------------------------------------------------------------
245  : public direct_connection_info<acceptor>
246  , public main_ctx_object {
247  using shared_state = std::shared_ptr<direct_connection_state>;
248 
249 public:
252  main_ctx_parent parent,
253  std::shared_ptr<direct_connection_address> address) noexcept
254  : main_ctx_object{EAGINE_ID(DrctAccptr), parent}
255  , _address{std::move(address)} {}
256 
259  : main_ctx_object{EAGINE_ID(DrctAccptr), parent}
260  , _address{std::make_shared<direct_connection_address>(*this)} {}
261 
262  auto process_accepted(const accept_handler& handler) -> bool final {
263  some_true something_done{};
264  if(_address) {
265  auto wrapped_handler = [&handler](shared_state& state) {
266  handler(std::unique_ptr<connection>{
267  std::make_unique<direct_server_connection>(state)});
268  };
269  something_done(
270  _address->process_all({construct_from, wrapped_handler}));
271  }
272  return something_done;
273  }
274 
276  auto make_connection() -> std::unique_ptr<connection> {
277  if(_address) {
278  return std::unique_ptr<connection>{
279  std::make_unique<direct_client_connection>(_address)};
280  }
281  return {};
282  }
283 
284 private:
285  std::shared_ptr<direct_connection_address> _address{};
286 };
287 //------------------------------------------------------------------------------
292  : public direct_connection_info<connection_factory>
293  , public main_ctx_object {
294 public:
297 
300  : main_ctx_object{EAGINE_ID(DrctConnFc), parent}
301  , _default_addr{_make_addr()} {}
302 
303  auto make_acceptor(string_view addr_str)
304  -> std::unique_ptr<acceptor> final {
305  if(addr_str) {
306  return std::make_unique<direct_acceptor>(*this, _get(addr_str));
307  }
308  return std::make_unique<direct_acceptor>(*this, _default_addr);
309  }
310 
312  -> std::unique_ptr<connection> final {
313  if(addr_str) {
314  return std::make_unique<direct_client_connection>(_get(addr_str));
315  }
316  return std::make_unique<direct_client_connection>(_default_addr);
317  }
318 
319 private:
320  std::shared_ptr<direct_connection_address> _default_addr;
321  std::map<
322  std::string,
323  std::shared_ptr<direct_connection_address>,
325  _addrs;
326 
327  auto _make_addr() -> std::shared_ptr<direct_connection_address> {
328  return std::make_shared<direct_connection_address>(*this);
329  }
330 
331  auto _get(string_view addr_str)
332  -> std::shared_ptr<direct_connection_address>& {
333  auto pos = _addrs.find(addr_str);
334  if(pos == _addrs.end()) {
335  pos = _addrs.emplace(to_string(addr_str), _make_addr()).first;
336  }
337  EAGINE_ASSERT(pos != _addrs.end());
338  return pos->second;
339  }
340 };
341 //------------------------------------------------------------------------------
342 } // namespace eagine::msgbus
343 
344 #endif // EAGINE_MESSAGE_BUS_DIRECT_HPP
auto make_acceptor()
Make a new acceptor listening on a default address.
Definition: conn_factory.hpp:34
Helper class used to initialize main context objects.
Definition: main_ctx_object.hpp:45
Implementation of client-side direct connection.
Definition: direct.hpp:157
auto make_connector(string_view addr_str) -> std::unique_ptr< connection > final
Make a new connector connecting to the specified address.
Definition: direct.hpp:311
void send_to_server(message_id msg_id, const message_view &message)
Sends a message to the server counterpart.
Definition: direct.hpp:34
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
std::shared_ptr< direct_connection_state > shared_state
Alias for shared pointer to direct state type.
Definition: direct.hpp:96
Implementation of server-side direct connection.
Definition: direct.hpp:217
Base class for main context objects.
Definition: main_ctx_object.hpp:71
auto fetch_all(fetch_handler handler) -> bool
Fetches all currently stored messages and calls handler on them.
direct_connection_address(main_ctx_parent parent)
Construction from a parent main context object.
Definition: direct.hpp:103
auto fetch_messages(connection::fetch_handler handler) -> bool final
Fetch all enqueued messages that have been received since last fetch.
Definition: direct.hpp:230
void cleanup() final
Cleans up the connection before destroying it.
Definition: direct.hpp:193
Comparator template for standard string - string span comparisons.
Definition: string_span.hpp:165
Implementation of acceptor for direct connections.
Definition: direct.hpp:244
auto update() -> bool final
Updates the internal state of the connection (called repeatedly).
Definition: direct.hpp:169
auto fetch_from_server(connection::fetch_handler handler) noexcept -> bool
Fetches received messages from the service counterpart.
Definition: direct.hpp:52
auto log_chart_sample(identifier series, float value) noexcept -> named_logging_object &
Stores a new value in the specified chart data series.
Definition: logger.hpp:372
@ in_process
In-process connection (cannot be used for inter-process communication).
direct_acceptor(main_ctx_parent parent, std::shared_ptr< direct_connection_address > address) noexcept
Construction from a parent main context object and an address object.
Definition: direct.hpp:251
auto make_acceptor(string_view addr_str) -> std::unique_ptr< acceptor > final
Make a new acceptor listening on the specified address.
Definition: direct.hpp:303
direct_connection_factory(main_ctx_parent parent)
Construction from a parent main context object with implicit address.
Definition: direct.hpp:299
auto process_accepted(const accept_handler &handler) -> bool final
Lets the handler process the pending accepted connections.
Definition: direct.hpp:262
auto make_connector()
Make a new connector connecting to the specified address.
Definition: conn_factory.hpp:40
auto count() const noexcept -> span_size_t
Returns the coung of messages in the storage.
Definition: message.hpp:459
Common shared state for a direct connection.
Definition: direct.hpp:27
auto make_connection() -> std::unique_ptr< connection >
Makes a new client-side direct connection.
Definition: direct.hpp:276
basic_address< false > address
Type alias for non-const memory address values.
Definition: address.hpp:203
Class acting as the "address" of a direct connection.
Definition: direct.hpp:93
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
Class storing initially false value and logically or-ing other values.
Definition: bool_aggregate.hpp:16
void send_to_client(message_id msg_id, const message_view &message)
Sends a message to the client counterpart.
Definition: direct.hpp:40
auto is_usable() -> bool final
Checks if the connection is in usable state.
Definition: direct.hpp:164
auto fetch_messages(connection::fetch_handler handler) -> bool final
Fetch all enqueued messages that have been received since last fetch.
Definition: direct.hpp:185
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
auto process_all(process_handler handler) -> bool
Handles the pending server counterparts for created client connections.
Definition: direct.hpp:116
direct_connection_state(main_ctx_parent parent)
Construction from a parent main context object.
Definition: direct.hpp:30
connection_addr_kind
Message bus connection address kind enumeration.
Definition: connection.hpp:30
auto send(message_id msg_id, const message_view &message) -> bool final
Sent a message with the specified id.
Definition: direct.hpp:176
connection_kind
Message bus connection kind bits enumeration.
Definition: connection_kind.hpp:21
Implementation of the connection_info interface for direct connections.
Definition: direct.hpp:135
void push(message_id msg_id, const message_view &message)
Pushes a message into this storage.
Definition: message.hpp:464
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
direct_acceptor(main_ctx_parent parent)
Construction from a parent main context object with implicit address.
Definition: direct.hpp:258
Implementation of connection_factory for direct connections.
Definition: direct.hpp:291
auto connect() -> shared_state
Creates and returns the shared state for a new client connection.
Definition: direct.hpp:108
auto send(message_id msg_id, const message_view &message) -> bool final
Sent a message with the specified id.
Definition: direct.hpp:222
auto fetch_from_client(connection::fetch_handler handler) noexcept -> bool
Fetches received messages from the client counterpart.
Definition: direct.hpp:46

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).