OGLplus  (0.59.0) a C++ wrapper for rendering APIs

asio.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_ASIO_HPP
10 #define EAGINE_MESSAGE_BUS_ASIO_HPP
11 
12 #include "../bool_aggregate.hpp"
13 #include "../branch_predict.hpp"
14 #include "../config/platform.hpp"
15 #include "../flat_map.hpp"
16 #include "../logging/type/exception.hpp"
17 #include "../main_ctx_object.hpp"
18 #include "../maybe_unused.hpp"
19 #include "../serialize/size_and_data.hpp"
20 #include "../timeout.hpp"
21 #include "../value_tracker.hpp"
22 #include "conn_factory.hpp"
23 #include "network.hpp"
24 #include "serialize.hpp"
25 #include <thread>
26 
27 #ifdef __clang__
28 EAGINE_DIAG_PUSH()
29 EAGINE_DIAG_OFF(disabled-macro-expansion)
30 EAGINE_DIAG_OFF(covered-switch-default)
31 EAGINE_DIAG_OFF(zero-as-null-pointer-constant)
32 EAGINE_DIAG_OFF(shorten-64-to-32)
33 EAGINE_DIAG_OFF(deprecated)
34 #endif
35 
36 #include <asio/connect.hpp>
37 #include <asio/io_context.hpp>
38 #include <asio/ip/tcp.hpp>
39 #include <asio/ip/udp.hpp>
40 #if EAGINE_POSIX
41 #include <asio/local/stream_protocol.hpp>
42 #endif
43 #include <asio/read.hpp>
44 #include <asio/write.hpp>
45 
46 #ifdef __clang__
47 EAGINE_DIAG_POP()
48 #endif
49 
50 namespace eagine::msgbus {
51 //------------------------------------------------------------------------------
52 template <connection_addr_kind, connection_protocol>
53 struct asio_types;
54 
55 template <connection_addr_kind Kind, connection_protocol Proto>
56 using asio_socket_type = typename asio_types<Kind, Proto>::socket_type;
57 
58 template <connection_addr_kind Kind, connection_protocol Proto>
59 using asio_endpoint_type =
60  typename asio_socket_type<Kind, Proto>::endpoint_type;
61 
62 template <typename Base, connection_addr_kind, connection_protocol>
63 class asio_connection_info;
64 
65 template <connection_addr_kind, connection_protocol>
66 class asio_connection;
67 
68 template <connection_addr_kind, connection_protocol>
69 class asio_connector;
70 
71 template <connection_addr_kind, connection_protocol>
72 class asio_acceptor;
73 //------------------------------------------------------------------------------
74 template <typename Socket>
75 class asio_flushing_sockets {
76 public:
77  void adopt(Socket& sckt) {
78  _waiting.emplace_back(std::chrono::seconds(10), std::move(sckt));
79  }
80 
81  auto empty() const noexcept -> bool {
82  return _waiting.empty();
83  }
84 
85  void update() noexcept {
86  _waiting.erase(
87  std::remove_if(
88  _waiting.begin(),
89  _waiting.end(),
90  [](auto& waiting) { return bool(std::get<0>(waiting)); }),
91  _waiting.end());
92  }
93 
94 private:
95  std::vector<std::tuple<timeout, Socket>> _waiting;
96 };
97 //------------------------------------------------------------------------------
98 struct asio_common_state {
99  asio::io_context context;
100 
101  asio_common_state() = default;
102  asio_common_state(asio_common_state&&) = delete;
103  asio_common_state(const asio_common_state&) = delete;
104  auto operator=(asio_common_state&&) = delete;
105  auto operator=(const asio_common_state&) = delete;
106 
107  ~asio_common_state() noexcept {
108  while(has_flushing()) {
109  update();
110  std::this_thread::yield();
111  }
112  }
113 
114  template <typename Socket>
115  void adopt_flushing(Socket& sckt) {
116  std::get<asio_flushing_sockets<Socket>>(_flushing).adopt(sckt);
117  }
118 
119  void update() noexcept {
120  _update_flushing(_flushing);
121  }
122 
123  auto has_flushing() const noexcept -> bool {
124  return _has_flushing(_flushing);
125  }
126 
127 private:
128  template <typename Tup, std::size_t... I>
129  static void
130  _do_update_flushing(Tup& flushing, std::index_sequence<I...>) noexcept {
131  (..., std::get<I>(flushing).update());
132  }
133 
134  template <typename Tup>
135  static void _update_flushing(Tup& flushing) noexcept {
136  _do_update_flushing(
137  flushing, std::make_index_sequence<std::tuple_size_v<Tup>>());
138  }
139 
140  template <typename Tup, std::size_t... I>
141  static auto
142  _does_have_flushing(Tup& flushing, std::index_sequence<I...>) noexcept
143  -> bool {
144  return (false || ... || !std::get<I>(flushing).empty());
145  }
146 
147  template <typename Tup>
148  static auto _has_flushing(Tup& flushing) noexcept -> bool {
149  return _does_have_flushing(
150  flushing, std::make_index_sequence<std::tuple_size_v<Tup>>());
151  }
152 
153  std::tuple<
154 #if EAGINE_POSIX
155  asio_flushing_sockets<asio::local::stream_protocol::socket>,
156 #endif
157  asio_flushing_sockets<asio::ip::tcp::socket>,
158  asio_flushing_sockets<asio::ip::udp::socket>>
159  _flushing;
160 };
161 //------------------------------------------------------------------------------
162 template <connection_addr_kind Kind, connection_protocol Proto>
163 struct asio_connection_group : interface<asio_connection_group<Kind, Proto>> {
164 
165  using endpoint_type = asio_endpoint_type<Kind, Proto>;
166 
167  virtual auto pack_into(endpoint_type&, memory::block)
168  -> message_pack_info = 0;
169 
170  virtual void
171  on_sent(const endpoint_type&, const message_pack_info& to_be_removed) = 0;
172 
173  virtual void on_received(const endpoint_type&, memory::const_block) = 0;
174 
175  virtual auto has_received() -> bool = 0;
176 };
177 //------------------------------------------------------------------------------
178 template <connection_addr_kind Kind, connection_protocol Proto>
179 struct asio_connection_state
180  : std::enable_shared_from_this<asio_connection_state<Kind, Proto>>
181  , main_ctx_object {
182  using endpoint_type = asio_endpoint_type<Kind, Proto>;
183  using clock_type = std::chrono::steady_clock;
184  using clock_time = typename clock_type::time_point;
185 
186  std::shared_ptr<asio_common_state> common;
187  asio_socket_type<Kind, Proto> socket;
188  endpoint_type conn_endpoint{};
189 
190  memory::buffer push_buffer{};
191  memory::buffer read_buffer{};
192  memory::buffer write_buffer{};
193  span_size_t total_used_size{0};
194  span_size_t total_sent_size{0};
195  clock_time send_start_time{clock_type::now()};
196  std::int32_t total_sent_messages{0};
197  std::int32_t total_sent_blocks{0};
198  float send_pack_ratio{1.F};
199  const float send_pack_factr{0.5F};
200  bool is_sending{false};
201  bool is_recving{false};
202 
203  asio_connection_state(
204  main_ctx_parent parent,
205  std::shared_ptr<asio_common_state> asio_state,
206  asio_socket_type<Kind, Proto> sock,
207  span_size_t block_size,
208  float pack_factr)
209  : main_ctx_object{EAGINE_ID(AsioConnSt), parent}
210  , common{std::move(asio_state)}
211  , socket{std::move(sock)}
212  , send_pack_factr{pack_factr} {
213  EAGINE_ASSERT(common);
214  common->update();
215 
216  EAGINE_ASSERT(block_size >= min_connection_data_size);
217  push_buffer.resize(block_size);
218  zero(cover(push_buffer));
219  read_buffer.resize(block_size);
220  zero(cover(read_buffer));
221  write_buffer.resize(block_size);
222  zero(cover(write_buffer));
223 
224  log_debug("allocating write buffer of ${size}")
225  .arg(EAGINE_ID(size), EAGINE_ID(ByteSize), write_buffer.size());
226  log_debug("allocating read buffer of ${size}")
227  .arg(EAGINE_ID(size), EAGINE_ID(ByteSize), read_buffer.size());
228  }
229 
230  asio_connection_state(
231  main_ctx_parent parent,
232  const std::shared_ptr<asio_common_state>& asio_state,
233  span_size_t block_size,
234  float pack_factr) noexcept
235  : asio_connection_state{
236  parent,
237  asio_state,
238  asio_socket_type<Kind, Proto>{asio_state->context},
239  block_size,
240  pack_factr} {}
241 
242  auto weak_ref() noexcept {
243  return std::weak_ptr(this->shared_from_this());
244  }
245 
246  auto is_usable() const -> bool {
247  if(EAGINE_LIKELY(common)) {
248  return socket.is_open();
249  }
250  return false;
251  }
252 
253  auto log_usage_stats(span_size_t threshold = 0) -> bool {
254  if(EAGINE_UNLIKELY(total_sent_size >= threshold)) {
255  const auto slack =
256  1.F - float(total_used_size) / float(total_sent_size);
257  const auto msgs_per_block =
258  total_sent_blocks
259  ? float(total_sent_messages) / float(total_sent_blocks)
260  : 0.F;
261  const auto used_per_sec =
262  total_used_size /
263  std::chrono::duration<float>(clock_type::now() - send_start_time)
264  .count();
265  const auto sent_per_sec =
266  total_sent_size /
267  std::chrono::duration<float>(clock_type::now() - send_start_time)
268  .count();
269 
270  log_stat("message slack ratio: ${slack}")
271  .arg(EAGINE_ID(usedSize), EAGINE_ID(ByteSize), total_used_size)
272  .arg(EAGINE_ID(sentSize), EAGINE_ID(ByteSize), total_sent_size)
273  .arg(EAGINE_ID(msgsPerBlk), msgs_per_block)
274  .arg(EAGINE_ID(usedPerSec), EAGINE_ID(ByteSize), used_per_sec)
275  .arg(EAGINE_ID(sentPerSec), EAGINE_ID(ByteSize), sent_per_sec)
276  .arg(EAGINE_ID(addrKind), Kind)
277  .arg(EAGINE_ID(protocol), Proto)
278  .arg(EAGINE_ID(slack), EAGINE_ID(Ratio), slack);
279  return true;
280  }
281  return false;
282  }
283 
284  template <typename Handler>
285  void do_start_send(
287  const endpoint_type&,
289  Handler handler) {
290  asio::async_write(
291  socket, asio::buffer(blk.data(), blk.size()), handler);
292  }
293 
294  template <typename Handler>
295  void do_start_send(
297  const endpoint_type& target_endpoint,
299  Handler handler) {
300  socket.async_send_to(
301  asio::buffer(blk.data(), blk.size()), target_endpoint, handler);
302  }
303 
304  void do_start_send(asio_connection_group<Kind, Proto>& group) {
305  using std::get;
306 
307  endpoint_type target_endpoint{conn_endpoint};
308  const auto packed =
309  group.pack_into(target_endpoint, cover(write_buffer));
310  if(!packed.is_empty() && packed.usage() > send_pack_ratio) {
311  is_sending = true;
312  send_pack_ratio = 1.F;
313  const auto blk = view(write_buffer);
314 
315  log_trace("sending data")
316  .arg(EAGINE_ID(packed), EAGINE_ID(bits), packed.bits())
317  .arg(EAGINE_ID(usedSize), EAGINE_ID(ByteSize), packed.used())
318  .arg(EAGINE_ID(sentSize), EAGINE_ID(ByteSize), packed.total())
319  .arg(EAGINE_ID(block), blk);
320 
321  do_start_send(
322  connection_protocol_tag<Proto>{},
323  target_endpoint,
324  blk,
325  [this, &group, target_endpoint, packed, selfref{weak_ref()}](
326  std::error_code error, std::size_t length) {
327  EAGINE_MAYBE_UNUSED(length);
328  if(const auto self{selfref.lock()}) {
329  if(!error) {
330  EAGINE_ASSERT(span_size(length) == packed.total());
331  log_trace("sent data")
332  .arg(
333  EAGINE_ID(usedSize),
334  EAGINE_ID(ByteSize),
335  packed.used())
336  .arg(
337  EAGINE_ID(sentSize),
338  EAGINE_ID(ByteSize),
339  packed.total());
340 
341  total_used_size += packed.used();
342  total_sent_size += packed.total();
343  total_sent_messages += packed.count();
344  total_sent_blocks += 1;
345 
346  if(this->log_usage_stats(span_size(2U << 27U))) {
347  total_used_size = 0;
348  total_sent_size = 0;
349  send_start_time = clock_type::now();
350  }
351 
352  this->handle_sent(group, target_endpoint, packed);
353  } else {
354  log_error("failed to send data: ${error}")
355  .arg(EAGINE_ID(error), error);
356  this->is_sending = false;
357  this->socket.close();
358  }
359  }
360  });
361  } else {
362  is_sending = false;
363  send_pack_ratio *= send_pack_factr;
364  }
365  }
366 
367  auto start_send(asio_connection_group<Kind, Proto>& group) -> bool {
368  if(!is_sending) {
369  do_start_send(group);
370  }
371  return is_sending;
372  }
373 
374  void handle_sent(
375  asio_connection_group<Kind, Proto>& group,
376  const endpoint_type& target_endpoint,
377  const message_pack_info& to_be_removed) {
378  group.on_sent(target_endpoint, to_be_removed);
379  do_start_send(group);
380  }
381 
382  template <typename Handler>
383  void
384  do_start_receive(stream_protocol_tag, memory::block blk, Handler handler) {
385  asio::async_read(socket, asio::buffer(blk.data(), blk.size()), handler);
386  }
387 
388  template <typename Handler>
389  void
390  do_start_receive(datagram_protocol_tag, memory::block blk, Handler handler) {
391  socket.async_receive_from(
392  asio::buffer(blk.data(), blk.size()), conn_endpoint, handler);
393  }
394 
395  void do_start_receive(asio_connection_group<Kind, Proto>& group) {
396  auto blk = cover(read_buffer);
397 
398  log_trace("receiving data (size: ${size})")
399  .arg(EAGINE_ID(size), EAGINE_ID(ByteSize), blk.size());
400 
401  is_recving = true;
402  do_start_receive(
403  connection_protocol_tag<Proto>{},
404  blk,
405  [this, &group, selfref{weak_ref()}, blk](
406  std::error_code error, std::size_t length) {
407  if(const auto self{selfref.lock()}) {
408  memory::const_block rcvd = head(blk, span_size(length));
409  if(!error) {
410  log_trace("received data (size: ${size})")
411  .arg(EAGINE_ID(block), rcvd)
412  .arg(EAGINE_ID(size), EAGINE_ID(ByteSize), length);
413 
414  this->handle_received(rcvd, group);
415  } else {
416  if(rcvd) {
417  log_warning("failed receiving data: ${error}")
418  .arg(EAGINE_ID(error), error);
419  this->handle_received(rcvd, group);
420  } else {
421  if(error == asio::error::eof) {
422  log_debug("received end-of-file");
423  } else if(error == asio::error::connection_reset) {
424  log_debug("connection reset by peer");
425  } else {
426  log_error("failed to receive data: ${error}")
427  .arg(EAGINE_ID(error), error);
428  }
429  }
430  this->is_recving = false;
431  this->socket.close();
432  }
433  }
434  });
435  }
436 
437  auto start_receive(asio_connection_group<Kind, Proto>& group) -> bool {
438  if(!is_recving) {
439  do_start_receive(group);
440  }
441  return group.has_received();
442  }
443 
444  void handle_received(
445  memory::const_block data,
446  asio_connection_group<Kind, Proto>& group) {
447  group.on_received(conn_endpoint, data);
448  do_start_receive(group);
449  }
450 
451  auto update() -> bool {
452  some_true something_done{};
453  if(const auto count{common->context.poll()}) {
454  log_trace("called ready handlers (count: ${count})")
455  .arg(EAGINE_ID(count), count);
456  something_done();
457  } else {
458  common->context.reset();
459  }
460  return something_done;
461  }
462 
463  void cleanup(asio_connection_group<Kind, Proto>& group) {
464  log_usage_stats();
465  while(is_usable() && start_send(group)) {
466  log_debug("flushing connection outbox");
467  update();
468  }
469  if(is_usable()) {
470  common->adopt_flushing(socket);
471  }
472  common->update();
473  }
474 };
475 //------------------------------------------------------------------------------
476 template <connection_addr_kind Kind, connection_protocol Proto>
477 class asio_connection_base
478  : public asio_connection_info<connection, Kind, Proto>
479  , public main_ctx_object {
480 public:
481  asio_connection_base(
482  main_ctx_parent parent,
483  std::shared_ptr<asio_common_state> asio_state,
484  span_size_t block_size,
485  float pack_factr)
486  : main_ctx_object{EAGINE_ID(AsioConnBs), parent}
487  , _state{std::make_shared<asio_connection_state<Kind, Proto>>(
488  *this,
489  std::move(asio_state),
490  block_size,
491  pack_factr)} {
492  EAGINE_ASSERT(_state);
493  }
494 
495  asio_connection_base(
496  main_ctx_parent parent,
497  std::shared_ptr<asio_common_state> asio_state,
498  asio_socket_type<Kind, Proto> socket,
499  span_size_t block_size,
500  float pack_factr)
501  : main_ctx_object{EAGINE_ID(AsioConnBs), parent}
502  , _state{std::make_shared<asio_connection_state<Kind, Proto>>(
503  *this,
504  std::move(asio_state),
505  std::move(socket),
506  block_size,
507  pack_factr)} {
508  EAGINE_ASSERT(_state);
509  }
510 
511  inline auto conn_state() noexcept -> auto& {
512  EAGINE_ASSERT(_state);
513  return *_state;
514  }
515 
516  auto max_data_size() -> valid_if_positive<span_size_t> final {
517  return {conn_state().write_buffer.size()};
518  }
519 
520  auto is_usable() -> bool final {
521  return conn_state().is_usable();
522  }
523 
524 protected:
525  std::shared_ptr<asio_connection_state<Kind, Proto>> _state;
526 
527  asio_connection_base(
528  main_ctx_parent parent,
529  std::shared_ptr<asio_connection_state<Kind, Proto>> state)
530  : main_ctx_object{EAGINE_ID(AsioConnBs), parent}
531  , _state{std::move(state)} {}
532 };
533 //------------------------------------------------------------------------------
534 template <connection_addr_kind Kind, connection_protocol Proto>
535 class asio_connection
536  : public asio_connection_base<Kind, Proto>
537  , public asio_connection_group<Kind, Proto> {
538 
539  using base = asio_connection_base<Kind, Proto>;
540  using endpoint_type = asio_endpoint_type<Kind, Proto>;
541 
542 public:
543  using base::base;
544  using base::conn_state;
545 
546  auto update() -> bool override {
547  some_true something_done{};
548  if(conn_state().socket.is_open()) {
549  something_done(conn_state().start_receive(*this));
550  something_done(conn_state().start_send(*this));
551  }
552  something_done(conn_state().update());
553  _log_message_counts();
554  return something_done;
555  }
556 
557  auto pack_into(endpoint_type&, memory::block data)
558  -> message_pack_info final {
559  return _outgoing.pack_into(data);
560  }
561 
562  void on_sent(const endpoint_type&, const message_pack_info& to_be_removed)
563  final {
564  return _outgoing.cleanup(to_be_removed);
565  }
566 
567  void on_received(const endpoint_type&, memory::const_block data) final {
568  return _incoming.push(data);
569  }
570 
571  auto has_received() -> bool final {
572  return !_incoming.empty();
573  }
574 
575  auto send(message_id msg_id, const message_view& message) -> bool final {
576  return _outgoing.enqueue(
577  *this, msg_id, message, cover(conn_state().push_buffer));
578  }
579 
580  auto fetch_messages(connection::fetch_handler handler) -> bool final {
581  return _incoming.fetch_messages(*this, handler);
582  }
583 
584  void cleanup() final {
585  this->_log_message_counts();
586  conn_state().cleanup(*this);
587  }
588 
589 protected:
590  void _log_message_counts() noexcept {
591  if constexpr(is_log_level_enabled_v<log_event_severity::stat>) {
592  if(_outgoing_count.has_changed(_outgoing.count())) {
593  this->log_chart_sample(
594  EAGINE_ID(outMsgCnt), float(_outgoing_count.get()));
595  }
596 
597  if(_incoming_count.has_changed(_incoming.count())) {
598  this->log_chart_sample(
599  EAGINE_ID(incMsgCnt), float(_incoming_count.get()));
600  }
601  }
602  }
603 
604 private:
605  connection_outgoing_messages _outgoing{};
606  connection_incoming_messages _incoming{};
607  value_change_div_tracker<span_size_t, 16> _outgoing_count{0};
608  value_change_div_tracker<span_size_t, 16> _incoming_count{0};
609 };
610 //------------------------------------------------------------------------------
611 template <connection_addr_kind Kind>
612 class asio_datagram_client_connection
613  : public asio_connection_base<Kind, connection_protocol::datagram> {
614 
615  using base = asio_connection_base<Kind, connection_protocol::datagram>;
616 
617 public:
618  using base::conn_state;
619 
620  asio_datagram_client_connection(
621  main_ctx_parent parent,
622  std::shared_ptr<asio_connection_state<Kind, connection_protocol::datagram>>
623  state,
624  std::shared_ptr<connection_outgoing_messages> outgoing,
625  std::shared_ptr<connection_incoming_messages> incoming)
626  : base(parent, std::move(state))
627  , _outgoing{std::move(outgoing)}
628  , _incoming{std::move(incoming)} {}
629 
630  auto pack_into(memory::block data) -> message_pack_info {
631  EAGINE_ASSERT(_outgoing);
632  return _outgoing->pack_into(data);
633  }
634 
635  void on_sent(const message_pack_info& to_be_removed) {
636  EAGINE_ASSERT(_outgoing);
637  return _outgoing->cleanup(to_be_removed);
638  }
639 
640  void on_received(memory::const_block data) {
641  EAGINE_ASSERT(_incoming);
642  _incoming->push(data);
643  }
644 
645  auto send(message_id msg_id, const message_view& message) -> bool final {
646  EAGINE_ASSERT(_outgoing);
647  return _outgoing->enqueue(
648  *this, msg_id, message, cover(conn_state().push_buffer));
649  }
650 
651  auto fetch_messages(connection::fetch_handler handler) -> bool final {
652  EAGINE_ASSERT(_incoming);
653  return _incoming->fetch_messages(*this, handler);
654  }
655 
656  auto update() -> bool final {
657  some_true something_done{};
658  something_done(conn_state().update());
659  return something_done;
660  }
661 
662 private:
663  std::shared_ptr<connection_outgoing_messages> _outgoing;
664  std::shared_ptr<connection_incoming_messages> _incoming;
665 };
666 //------------------------------------------------------------------------------
667 template <connection_addr_kind Kind>
668 class asio_datagram_server_connection
669  : public asio_connection_base<Kind, connection_protocol::datagram>
670  , public asio_connection_group<Kind, connection_protocol::datagram> {
671 
672  using base = asio_connection_base<Kind, connection_protocol::datagram>;
673  using endpoint_type =
674  asio_endpoint_type<Kind, connection_protocol::datagram>;
675 
676 public:
677  using base::base;
678  using base::conn_state;
679 
680  auto pack_into(endpoint_type& target, memory::block dest)
681  -> message_pack_info final {
682  EAGINE_ASSERT(_index >= 0);
683  const auto prev_idx{_index};
684  do {
685  if(_index < span_size(_current.size())) {
686  auto pos = _current.begin();
687  std::advance(pos, _index);
688  ++_index;
689  auto& [ep, out_in] = *pos;
690  auto& outgoing = std::get<0>(out_in);
691  EAGINE_ASSERT(outgoing);
692  const auto packed = outgoing->pack_into(dest);
693  if(!packed.is_empty()) {
694  target = ep;
695  return packed;
696  }
697  } else {
698  _index = 0;
699  }
700  } while(_index != prev_idx);
701  return {0};
702  }
703 
704  void on_sent(
705  const endpoint_type& ep,
706  const message_pack_info& to_be_removed) final {
707  _outgoing(ep).cleanup(to_be_removed);
708  }
709 
710  void on_received(const endpoint_type& ep, memory::const_block data) final {
711  _incoming(ep).push(data);
712  }
713 
714  auto has_received() -> bool final {
715  for(auto m : {&_current, &_pending}) {
716  for(const auto& p : *m) {
717  const auto& incoming = std::get<1>(std::get<1>(p));
718  EAGINE_ASSERT(incoming);
719  if(!incoming->empty()) {
720  return true;
721  }
722  }
723  }
724  return false;
725  }
726 
727  auto send(message_id, const message_view&) -> bool final {
728  EAGINE_UNREACHABLE();
729  return false;
730  }
731 
732  auto fetch_messages(connection::fetch_handler) -> bool final {
733  EAGINE_UNREACHABLE();
734  return false;
735  }
736 
737  auto process_accepted(const acceptor::accept_handler& handler) -> bool {
738  some_true something_done;
739  for(auto& p : _pending) {
740  handler(std::make_unique<asio_datagram_client_connection<Kind>>(
741  *this,
742  this->_state,
743  std::get<0>(std::get<1>(p)),
744  std::get<1>(std::get<1>(p))));
745  _current.insert(p);
746  something_done();
747  }
748  _pending.clear();
749  if(something_done) {
750  this->log_debug("accepted datagram endpoints")
751  .arg(EAGINE_ID(current), _current.size());
752  }
753  return something_done;
754  }
755 
756  auto update() -> bool final {
757  some_true something_done{};
758  if(conn_state().socket.is_open()) {
759  something_done(conn_state().start_receive(*this));
760  something_done(conn_state().start_send(*this));
761  } else {
762  this->log_warning("datagram socket is not open");
763  }
764  something_done(conn_state().update());
765  return something_done;
766  }
767 
768  void cleanup() final {
769  conn_state().cleanup(*this);
770  }
771 
772 private:
773  auto _get(const endpoint_type& ep) -> auto& {
774  auto pos = _current.find(ep);
775  if(pos == _current.end()) {
776  pos = _pending.find(ep);
777  if(pos == _pending.end()) {
778  pos = _pending
779  .try_emplace(
780  ep,
781  std::make_shared<connection_outgoing_messages>(),
782  std::make_shared<connection_incoming_messages>())
783  .first;
784  this->log_debug("added pending datagram endpoint")
785  .arg(EAGINE_ID(pending), _pending.size())
786  .arg(EAGINE_ID(current), _current.size());
787  }
788  }
789  return std::get<1>(*pos);
790  }
791 
792  auto _outgoing(const endpoint_type& ep) -> connection_outgoing_messages& {
793  auto& outgoing = std::get<0>(_get(ep));
794  EAGINE_ASSERT(outgoing);
795  return *outgoing;
796  }
797 
798  auto _incoming(const endpoint_type& ep) -> connection_incoming_messages& {
799  auto& incoming = std::get<1>(_get(ep));
800  EAGINE_ASSERT(incoming);
801  return *incoming;
802  }
803 
804  flat_map<
805  endpoint_type,
806  std::tuple<
807  std::shared_ptr<connection_outgoing_messages>,
808  std::shared_ptr<connection_incoming_messages>>>
809  _current{}, _pending{};
810  span_size_t _index{0};
811 };
812 //------------------------------------------------------------------------------
813 // TCP/IPv4
814 //------------------------------------------------------------------------------
815 template <typename Base>
816 class asio_connection_info<
817  Base,
819  connection_protocol::stream> : public Base {
820 public:
821  auto kind() -> connection_kind final {
823  }
824 
825  auto addr_kind() -> connection_addr_kind final {
827  }
828 
829  auto type_id() -> identifier final {
830  return EAGINE_ID(AsioTcpIp4);
831  }
832 };
833 //------------------------------------------------------------------------------
834 template <>
836  using socket_type = asio::ip::tcp::socket;
837 };
838 //------------------------------------------------------------------------------
839 template <>
841  : public asio_connection<
842  connection_addr_kind::ipv4,
843  connection_protocol::stream> {
844 
845  using base =
846  asio_connection<connection_addr_kind::ipv4, connection_protocol::stream>;
847  using base::conn_state;
848 
849  asio::ip::tcp::resolver _resolver;
850  std::tuple<std::string, ipv4_port> _addr;
851  timeout _should_reconnect{std::chrono::seconds{1}, nothing};
852  bool _connecting{false};
853 
854  void
855  _start_connect(asio::ip::tcp::resolver::iterator resolved, ipv4_port port) {
856  auto& ep = conn_state().conn_endpoint = *resolved;
857  ep.port(port);
858 
859  this->log_debug("connecting to ${host}:${port}")
860  .arg(EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
861  .arg(EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
862 
863  conn_state().socket.async_connect(
864  ep, [this, resolved, port](std::error_code error) mutable {
865  if(!error) {
866  this->log_debug("connected on address ${host}:${port}")
867  .arg(
868  EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
869  .arg(
870  EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
871  this->_connecting = false;
872  } else {
873  if(++resolved != asio::ip::tcp::resolver::iterator{}) {
874  this->_start_connect(resolved, port);
875  } else {
876  this
877  ->log_error(
878  "failed to connect on address "
879  "${address}:${port}: "
880  "${error}")
881  .arg(EAGINE_ID(error), error)
882  .arg(
883  EAGINE_ID(host),
884  EAGINE_ID(IpV4Host),
885  std::get<0>(_addr))
886  .arg(
887  EAGINE_ID(port),
888  EAGINE_ID(IpV4Port),
889  std::get<1>(_addr));
890  this->_connecting = false;
891  }
892  }
893  });
894  }
895 
896  void _start_resolve() {
897  _connecting = true;
898  auto& [host, port] = _addr;
899  _resolver.async_resolve(
900  asio::string_view(host.data(), std_size(host.size())),
901  {},
902  [this, port{port}](std::error_code error, auto resolved) {
903  if(!error) {
904  this->_start_connect(resolved, port);
905  } else {
906  this->log_error("failed to resolve address: ${error}")
907  .arg(EAGINE_ID(error), error);
908  this->_connecting = false;
909  }
910  });
911  }
912 
913 public:
914  asio_connector(
915  main_ctx_parent parent,
916  const std::shared_ptr<asio_common_state>& asio_state,
917  string_view addr_str,
918  span_size_t block_size,
919  float pack_factr)
920  : base{parent, asio_state, block_size, pack_factr}
921  , _resolver{asio_state->context}
922  , _addr{parse_ipv4_addr(addr_str)} {}
923 
924  auto update() -> bool final {
925  some_true something_done{};
926  if(conn_state().socket.is_open()) {
927  something_done(conn_state().start_receive(*this));
928  something_done(conn_state().start_send(*this));
929  } else if(!_connecting) {
930  if(_should_reconnect) {
931  _should_reconnect.reset();
932  _start_resolve();
933  something_done();
934  }
935  }
936  something_done(conn_state().update());
937  this->_log_message_counts();
938  return something_done;
939  }
940 };
941 //------------------------------------------------------------------------------
942 template <>
943 class asio_acceptor<connection_addr_kind::ipv4, connection_protocol::stream>
944  : public asio_connection_info<
945  acceptor,
946  connection_addr_kind::ipv4,
947  connection_protocol::stream>
948  , public main_ctx_object {
949 private:
950  std::shared_ptr<asio_common_state> _asio_state;
951  std::tuple<std::string, ipv4_port> _addr;
952  asio::ip::tcp::acceptor _acceptor;
953  asio::ip::tcp::socket _socket;
954  span_size_t _block_size;
955  float _pack_factr;
956 
957  std::vector<asio::ip::tcp::socket> _accepted;
958 
959  void _start_accept() {
960  log_debug("accepting connection on address ${host}:${port}")
961  .arg(EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
962  .arg(EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
963 
964  _socket = asio::ip::tcp::socket(this->_asio_state->context);
965  _acceptor.async_accept(_socket, [this](std::error_code error) {
966  if(!error) {
967  log_debug("accepted connection on address ${host}:${port}")
968  .arg(EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
969  .arg(
970  EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
971  this->_accepted.emplace_back(std::move(this->_socket));
972  } else {
973  log_error(
974  "failed to accept connection on address "
975  "${host}:${port}: "
976  "${error}")
977  .arg(EAGINE_ID(error), error)
978  .arg(EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
979  .arg(
980  EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
981  }
982  _start_accept();
983  });
984  }
985 
986 public:
987  asio_acceptor(
988  main_ctx_parent parent,
989  std::shared_ptr<asio_common_state> asio_state,
990  string_view addr_str,
991  span_size_t block_size,
992  float pack_factr) noexcept
993  : main_ctx_object{EAGINE_ID(AsioAccptr), parent}
994  , _asio_state{std::move(asio_state)}
995  , _addr{parse_ipv4_addr(addr_str)}
996  , _acceptor{_asio_state->context}
997  , _socket{_asio_state->context}
998  , _block_size{block_size}
999  , _pack_factr{pack_factr} {}
1000 
1001  auto update() -> bool final {
1002  EAGINE_ASSERT(this->_asio_state);
1003  some_true something_done{};
1004  if(!_acceptor.is_open()) {
1005  asio::ip::tcp::endpoint endpoint(
1006  asio::ip::tcp::v4(), std::get<1>(_addr));
1007  _acceptor.open(endpoint.protocol());
1008  _acceptor.bind(endpoint);
1009  _acceptor.listen();
1010  _start_accept();
1011  something_done();
1012  }
1013  if(this->_asio_state->context.poll()) {
1014  something_done();
1015  } else {
1016  this->_asio_state->context.reset();
1017  }
1018  return something_done;
1019  }
1020 
1021  auto process_accepted(const accept_handler& handler) -> bool final {
1022  some_true something_done{};
1023  for(auto& socket : _accepted) {
1024  auto conn = std::make_unique<asio_connection<
1025  connection_addr_kind::ipv4,
1026  connection_protocol::stream>>(
1027  *this, _asio_state, std::move(socket), _block_size, _pack_factr);
1028  handler(std::move(conn));
1029  something_done();
1030  }
1031  _accepted.clear();
1032  return something_done;
1033  }
1034 };
1035 //------------------------------------------------------------------------------
1036 // UDP/IPv4
1037 //------------------------------------------------------------------------------
1038 template <typename Base>
1039 class asio_connection_info<
1040  Base,
1042  connection_protocol::datagram> : public Base {
1043 public:
1044  auto kind() -> connection_kind final {
1045  return connection_kind::remote_interprocess;
1046  }
1047 
1048  auto addr_kind() -> connection_addr_kind final {
1049  return connection_addr_kind::ipv4;
1050  }
1051 
1052  auto type_id() -> identifier final {
1053  return EAGINE_ID(AsioUdpIp4);
1054  }
1055 };
1056 //------------------------------------------------------------------------------
1057 template <>
1058 struct asio_types<connection_addr_kind::ipv4, connection_protocol::datagram> {
1059  using socket_type = asio::ip::udp::socket;
1060 };
1061 //------------------------------------------------------------------------------
1062 template <>
1063 class asio_connector<connection_addr_kind::ipv4, connection_protocol::datagram>
1064  : public asio_connection<
1065  connection_addr_kind::ipv4,
1066  connection_protocol::datagram> {
1067 
1068  using base =
1069  asio_connection<connection_addr_kind::ipv4, connection_protocol::datagram>;
1070  using base::conn_state;
1071 
1072  asio::ip::udp::resolver _resolver;
1073  std::tuple<std::string, ipv4_port> _addr;
1074  timeout _should_reconnect{std::chrono::seconds{1}, nothing};
1075  bool _establishing{false};
1076 
1077  void _on_resolve(
1078  const asio::ip::udp::resolver::iterator& resolved,
1079  ipv4_port port) {
1080  auto& ep = conn_state().conn_endpoint = *resolved;
1081  ep.port(port);
1082  conn_state().socket.open(ep.protocol());
1083  this->_establishing = false;
1084 
1085  this->log_debug("resolved address ${host}:${port}")
1086  .arg(EAGINE_ID(host), EAGINE_ID(IpV4Host), std::get<0>(_addr))
1087  .arg(EAGINE_ID(port), EAGINE_ID(IpV4Port), std::get<1>(_addr));
1088  }
1089 
1090  void _start_resolve() {
1091  _establishing = true;
1092  const auto& [host, port] = _addr;
1093  _resolver.async_resolve(
1094  host, {}, [this, port{port}](std::error_code error, auto resolved) {
1095  if(!error) {
1096  this->_on_resolve(resolved, port);
1097  } else {
1098  this->log_error("failed to resolve address: ${error}")
1099  .arg(EAGINE_ID(error), error);
1100  this->_establishing = false;
1101  }
1102  });
1103  }
1104 
1105 public:
1106  asio_connector(
1107  main_ctx_parent parent,
1108  const std::shared_ptr<asio_common_state>& asio_state,
1109  string_view addr_str,
1110  span_size_t block_size,
1111  float pack_factr)
1112  : base{parent, asio_state, block_size, pack_factr}
1113  , _resolver{asio_state->context}
1114  , _addr{parse_ipv4_addr(addr_str)} {}
1115 
1116  auto update() -> bool final {
1117  some_true something_done{};
1118  if(conn_state().socket.is_open()) {
1119  something_done(conn_state().start_receive(*this));
1120  something_done(conn_state().start_send(*this));
1121  } else if(!_establishing) {
1122  if(_should_reconnect) {
1123  _should_reconnect.reset();
1124  _start_resolve();
1125  something_done();
1126  }
1127  }
1128  something_done(conn_state().update());
1129  this->_log_message_counts();
1130  return something_done;
1131  }
1132 };
1133 //------------------------------------------------------------------------------
1134 template <>
1135 class asio_acceptor<connection_addr_kind::ipv4, connection_protocol::datagram>
1136  : public asio_connection_info<
1137  acceptor,
1138  connection_addr_kind::ipv4,
1139  connection_protocol::datagram>
1140  , public main_ctx_object {
1141 private:
1142  std::shared_ptr<asio_common_state> _asio_state;
1143  std::tuple<std::string, ipv4_port> _addr;
1144 
1145  asio_datagram_server_connection<connection_addr_kind::ipv4> _conn;
1146 
1147 public:
1148  asio_acceptor(
1149  main_ctx_parent parent,
1150  std::shared_ptr<asio_common_state> asio_state,
1151  string_view addr_str,
1152  span_size_t block_size,
1153  float pack_factr) noexcept
1154  : main_ctx_object{EAGINE_ID(AsioAccptr), parent}
1155  , _asio_state{std::move(asio_state)}
1156  , _addr{parse_ipv4_addr(addr_str)}
1157  , _conn{
1158  *this,
1159  _asio_state,
1160  asio::ip::udp::socket{
1161  _asio_state->context,
1162  asio::ip::udp::endpoint{asio::ip::udp::v4(), std::get<1>(_addr)}},
1163  block_size,
1164  pack_factr} {}
1165 
1166  auto update() -> bool final {
1167  return _conn.update();
1168  }
1169 
1170  auto process_accepted(const accept_handler& handler) -> bool final {
1171  return _conn.process_accepted(handler);
1172  }
1173 };
1174 //------------------------------------------------------------------------------
1175 // Local/Stream
1176 #if EAGINE_POSIX
1177 //------------------------------------------------------------------------------
1178 template <typename Base>
1179 class asio_connection_info<
1180  Base,
1182  connection_protocol::stream> : public Base {
1183 public:
1184  auto kind() -> connection_kind final {
1185  return connection_kind::local_interprocess;
1186  }
1187 
1188  auto addr_kind() -> connection_addr_kind final {
1189  return connection_addr_kind::filepath;
1190  }
1191 
1192  auto type_id() -> identifier final {
1193  return EAGINE_ID(AsioLclStr);
1194  }
1195 };
1196 //------------------------------------------------------------------------------
1197 template <>
1198 struct asio_types<connection_addr_kind::filepath, connection_protocol::stream> {
1199  using socket_type = asio::local::stream_protocol::socket;
1200 };
1201 //------------------------------------------------------------------------------
1202 template <>
1203 class asio_connector<connection_addr_kind::filepath, connection_protocol::stream>
1204  : public asio_connection<
1205  connection_addr_kind::filepath,
1206  connection_protocol::stream> {
1207  using base = asio_connection<
1208  connection_addr_kind::filepath,
1209  connection_protocol::stream>;
1210 
1211  std::string _addr_str;
1212  timeout _should_reconnect{std::chrono::seconds{1}, nothing};
1213  bool _connecting{false};
1214 
1215  void _start_connect() {
1216  _connecting = true;
1217  this->log_debug("connecting to ${address}")
1218  .arg(EAGINE_ID(address), EAGINE_ID(FsPath), this->_addr_str);
1219 
1220  conn_state().socket.async_connect(
1221  conn_state().conn_endpoint, [this](std::error_code error) mutable {
1222  if(!error) {
1223  this->log_debug("connected on address ${address}")
1224  .arg(EAGINE_ID(address), EAGINE_ID(FsPath), _addr_str);
1225  _connecting = false;
1226  } else {
1227  this->log_error("failed to connect: ${error}")
1228  .arg(EAGINE_ID(error), error);
1229  _connecting = false;
1230  }
1231  });
1232  }
1233 
1234  static inline auto _fix_addr(string_view addr_str) noexcept {
1235  return addr_str ? addr_str : string_view{"/tmp/eagine-msgbus.socket"};
1236  }
1237 
1238 public:
1239  asio_connector(
1240  main_ctx_parent parent,
1241  const std::shared_ptr<asio_common_state>& asio_state,
1242  string_view addr_str,
1243  span_size_t block_size,
1244  float pack_factr)
1245  : base{parent, asio_state, block_size, pack_factr}
1246  , _addr_str{_fix_addr(addr_str)} {
1247  conn_state().conn_endpoint = {_addr_str.c_str()};
1248  }
1249 
1250  auto update() -> bool final {
1251  some_true something_done{};
1252  if(conn_state().socket.is_open()) {
1253  something_done(conn_state().start_receive(*this));
1254  something_done(conn_state().start_send(*this));
1255  } else if(!_connecting) {
1256  if(_should_reconnect) {
1257  _should_reconnect.reset();
1258  _start_connect();
1259  something_done();
1260  }
1261  }
1262  something_done(conn_state().update());
1263  this->_log_message_counts();
1264  return something_done;
1265  }
1266 };
1267 //------------------------------------------------------------------------------
1268 template <>
1269 class asio_acceptor<connection_addr_kind::filepath, connection_protocol::stream>
1270  : public asio_connection_info<
1271  acceptor,
1272  connection_addr_kind::filepath,
1273  connection_protocol::stream>
1274  , public main_ctx_object {
1275 private:
1276  std::shared_ptr<asio_common_state> _asio_state;
1277  std::string _addr_str;
1278  asio::local::stream_protocol::acceptor _acceptor;
1279  span_size_t _block_size;
1280  float _pack_factr;
1281  bool _accepting{false};
1282 
1283  std::vector<asio::local::stream_protocol::socket> _accepted;
1284 
1285  void _start_accept() {
1286  log_debug("accepting connection on address ${address}")
1287  .arg(EAGINE_ID(address), EAGINE_ID(FsPath), _addr_str);
1288 
1289  _accepting = true;
1290  _acceptor.async_accept([this](
1291  std::error_code error,
1292  asio::local::stream_protocol::socket socket) {
1293  if(error) {
1294  this->_accepting = false;
1295  this
1296  ->log_error(
1297  "failed to accept connection on address ${address}: "
1298  "${error}")
1299  .arg(EAGINE_ID(error), error)
1300  .arg(EAGINE_ID(address), EAGINE_ID(FsPath), _addr_str);
1301  } else {
1302  this->log_debug("accepted connection on address ${address}")
1303  .arg(EAGINE_ID(address), EAGINE_ID(FsPath), _addr_str);
1304  this->_accepted.emplace_back(std::move(socket));
1305  }
1306  _start_accept();
1307  });
1308  }
1309 
1310  static inline auto _fix_addr(string_view addr_str) noexcept {
1311  return addr_str ? addr_str : string_view{"/tmp/eagine-msgbus.socket"};
1312  }
1313 
1314  static inline auto _prepare(
1315  std::shared_ptr<asio_common_state> asio_state,
1316  string_view addr_str) -> std::shared_ptr<asio_common_state> {
1317  std::remove(c_str(addr_str));
1318  return asio_state;
1319  }
1320 
1321 public:
1322  asio_acceptor(
1323  main_ctx_parent parent,
1324  std::shared_ptr<asio_common_state> asio_state,
1325  string_view addr_str, span_size_t block_size, float pack_factr) noexcept
1326  : main_ctx_object{EAGINE_ID(AsioAccptr), parent}
1327  , _asio_state{_prepare(std::move(asio_state), _fix_addr(addr_str))}
1328  , _addr_str{to_string(_fix_addr(addr_str))}
1329  , _acceptor{
1330  _asio_state->context,
1331  asio::local::stream_protocol::endpoint(_addr_str.c_str())}
1332  , _block_size{block_size}
1333  , _pack_factr{pack_factr} {
1334  }
1335 
1336  ~asio_acceptor() noexcept override {
1337  try {
1338  std::remove(_addr_str.c_str());
1339  } catch(...) {
1340  }
1341  }
1342 
1343  asio_acceptor(asio_acceptor&&) = delete;
1344  asio_acceptor(const asio_acceptor&) = delete;
1345  auto operator=(asio_acceptor&&) = delete;
1346  auto operator=(const asio_acceptor&) = delete;
1347 
1348  auto update() -> bool final {
1349  EAGINE_ASSERT(this->_asio_state);
1350  some_true something_done{};
1351  if(EAGINE_UNLIKELY(!_acceptor.is_open())) {
1352  _acceptor = {
1353  _asio_state->context,
1354  asio::local::stream_protocol::endpoint(_addr_str.c_str())};
1355  something_done();
1356  }
1357  if(_acceptor.is_open() && !_accepting) {
1358  _start_accept();
1359  something_done();
1360  }
1361  if(this->_asio_state->context.poll()) {
1362  something_done();
1363  } else {
1364  this->_asio_state->context.reset();
1365  }
1366  return something_done;
1367  }
1368 
1369  auto process_accepted(const accept_handler& handler) -> bool final {
1370  some_true something_done{};
1371  for(auto& socket : _accepted) {
1372  auto conn = std::make_unique<asio_connection<
1373  connection_addr_kind::filepath,
1374  connection_protocol::stream>>(
1375  *this, _asio_state, std::move(socket), _block_size, _pack_factr);
1376  handler(std::move(conn));
1377  something_done();
1378  }
1379  _accepted.clear();
1380  return something_done;
1381  }
1382 };
1383 //------------------------------------------------------------------------------
1384 #endif // EAGINE_POSIX
1385 //------------------------------------------------------------------------------
1386 // Factory
1387 //------------------------------------------------------------------------------
1388 template <connection_addr_kind Kind, connection_protocol Proto>
1389 class asio_connection_factory
1390  : public asio_connection_info<connection_factory, Kind, Proto>
1391  , public main_ctx_object {
1392 private:
1393  std::shared_ptr<asio_common_state> _asio_state;
1394 
1395  template <connection_addr_kind K, connection_protocol P>
1396  static constexpr auto _default_block_size(
1397  connection_addr_kind_tag<K>,
1398  connection_protocol_tag<P>) noexcept -> span_size_t {
1399  return 4 * 1024;
1400  }
1401 
1402  template <connection_addr_kind K, connection_protocol P>
1403  static constexpr auto _default_pack_factr(
1404  connection_addr_kind_tag<K>,
1405  connection_protocol_tag<P>) noexcept -> float {
1406  if(K == connection_addr_kind::filepath) {
1407  return 0.125F;
1408  }
1409  if(P == connection_protocol::stream) {
1410  return 0.75F;
1411  }
1412  return 0.5F;
1413  }
1414 
1415  template <connection_addr_kind K>
1416  static constexpr auto _default_block_size(
1417  connection_addr_kind_tag<K>,
1418  datagram_protocol_tag) noexcept -> span_size_t {
1419  return min_connection_data_size;
1420  }
1421 
1422  span_size_t _block_size{default_block_size()};
1423  float _pack_factr{0.5F};
1424 
1425 public:
1426  using connection_factory::make_acceptor;
1427  using connection_factory::make_connector;
1428 
1429  static constexpr auto default_block_size() noexcept -> span_size_t {
1430  return _default_block_size(
1431  connection_addr_kind_tag<Kind>{}, connection_protocol_tag<Proto>{});
1432  }
1433 
1434  static constexpr auto default_pack_factor() noexcept -> float {
1435  return _default_pack_factr(
1436  connection_addr_kind_tag<Kind>{}, connection_protocol_tag<Proto>{});
1437  }
1438 
1439  asio_connection_factory(
1440  main_ctx_parent parent,
1441  std::shared_ptr<asio_common_state> asio_state,
1442  span_size_t block_size,
1443  float pack_factr) noexcept
1444  : main_ctx_object{EAGINE_ID(AsioConnFc), parent}
1445  , _asio_state{std::move(asio_state)}
1446  , _block_size{block_size}
1447  , _pack_factr{pack_factr} {}
1448 
1449  asio_connection_factory(
1450  main_ctx_parent parent,
1451  span_size_t block_size,
1452  float pack_factr)
1453  : asio_connection_factory{
1454  parent,
1455  std::make_shared<asio_common_state>(),
1456  block_size,
1457  pack_factr} {}
1458 
1459  asio_connection_factory(main_ctx_parent parent)
1460  : asio_connection_factory{
1461  parent,
1462  default_block_size(),
1463  default_pack_factor()} {}
1464 
1465  auto make_acceptor(string_view addr_str)
1466  -> std::unique_ptr<acceptor> final {
1467  return std::make_unique<asio_acceptor<Kind, Proto>>(
1468  *this, _asio_state, addr_str, _block_size, _pack_factr);
1469  }
1470 
1471  auto make_connector(string_view addr_str)
1472  -> std::unique_ptr<connection> final {
1473  return std::make_unique<asio_connector<Kind, Proto>>(
1474  *this, _asio_state, addr_str, _block_size, _pack_factr);
1475  }
1476 };
1477 //------------------------------------------------------------------------------
1478 using asio_tcp_ipv4_connection_factory = asio_connection_factory<
1479  connection_addr_kind::ipv4,
1480  connection_protocol::stream>;
1481 //------------------------------------------------------------------------------
1482 using asio_udp_ipv4_connection_factory = asio_connection_factory<
1483  connection_addr_kind::ipv4,
1484  connection_protocol::datagram>;
1485 //------------------------------------------------------------------------------
1486 #if EAGINE_POSIX
1487 using asio_local_stream_connection_factory = asio_connection_factory<
1488  connection_addr_kind::filepath,
1489  connection_protocol::stream>;
1490 #endif // EAGINE_POSIX
1491 //------------------------------------------------------------------------------
1492 } // namespace eagine::msgbus
1493 
1494 #endif // EAGINE_MESSAGE_BUS_ASIO_HPP
auto log_debug(string_view format) noexcept
Create a log message entry for debugging, with specified format.
Definition: logger.hpp:341
const main_ctx_object_parent_info & main_ctx_parent
Alias for main_ctx_object_parent_info parameter type.
Definition: main_ctx_fwd.hpp:24
std::ptrdiff_t span_size_t
Signed span size type used by eagine.
Definition: types.hpp:36
basic_string_span< const char > string_view
Alias for const string views.
Definition: string_span.hpp:116
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
static constexpr auto c_str(memory::basic_span< C, P, S > s) -> std::enable_if_t< std::is_convertible_v< memory::basic_span< C, P, S >, basic_string_span< C, P, S >>, basic_c_str< C, P, S >>
Functions that construct a basic_c_str from a basic_string_span.
Definition: string_span.hpp:226
auto log_warning(string_view format) noexcept
Create a log message entry for warning, with specified format.
Definition: logger.hpp:323
static constexpr auto span_size(T v) noexcept
Converts argument to span size type.
Definition: types.hpp:59
static constexpr auto cover(T *addr, S size) noexcept -> span_if_mutable< T >
Creates a span starting at the specified pointer and specified length.
Definition: span.hpp:465
basic_block< true > const_block
Alias for const byte memory span.
Definition: block.hpp:32
static auto parse_ipv4_addr(string_view addr_str) -> std::tuple< std::string, ipv4_port >
Parses a IPv4 hostname:port pair,.
Definition: network.hpp:26
auto log_trace(string_view format) noexcept
Create a log message entry for tracing, with specified format.
Definition: logger.hpp:347
static constexpr auto view(T *addr, S size) noexcept -> const_span< T >
Creates a view starting at the specified pointer and specified length.
Definition: span.hpp:458
@ endpoint
Message bus client endpoint.
static constexpr auto head(basic_span< T, P, S > s, L l) noexcept -> basic_span< T, P, S >
Returns the first l elements from the front of a span.
Definition: span_algo.hpp:99
main_ctx_object(identifier obj_id, main_ctx_parent parent) noexcept
Initialization from object id and parent.
Definition: main_ctx_object.hpp:77
basic_block< false > block
Alias for non-const byte memory span.
Definition: block.hpp:27
@ remote_interprocess
Inter-process connection for remote communucation.
@ error
Error log entries, indicating serious problems.
auto log_stat(string_view format) noexcept
Create a log message entry for statistic, with specified format.
Definition: logger.hpp:335
connection_protocol_tag< connection_protocol::stream > stream_protocol_tag
Tag type for specifying stream connection protocols.
Definition: connection.hpp:87
basic_address< false > address
Type alias for non-const memory address values.
Definition: address.hpp:203
static constexpr auto std_size(T v) noexcept
Converts argument to std size type.
Definition: types.hpp:52
connection_protocol_tag< connection_protocol::datagram > datagram_protocol_tag
Tag type for specifying datagram connection protocols.
Definition: connection.hpp:93
@ kind
The node kind has appeared or changed.
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
constexpr const span_size_t min_connection_data_size
The minimum guaranteed block size that can be sent through bus connections.
Definition: connection.hpp:25
connection_addr_kind
Message bus connection address kind enumeration.
Definition: connection.hpp:30
static auto zero(basic_span< T, P, S > spn) -> std::enable_if_t< std::is_integral_v< T >||std::is_floating_point_v< T >, basic_span< T, P, S >>
Fills a span with zero value of type T.
Definition: span_algo.hpp:548
callable_ref< bool(message_id, message_age, const message_view &)> fetch_handler
Alias for fetch handler callable reference type.
Definition: connection.hpp:118
connection_kind
Message bus connection kind bits enumeration.
Definition: connection_kind.hpp:21
unsigned short int ipv4_port
Alias for IPv4 port number value type.
Definition: network.hpp:22
@ stream
Reliable stream protocol.
callable_ref< void(std::unique_ptr< connection >)> accept_handler
Alias for accepted connection handler callable reference type.
Definition: acceptor.hpp:23
basic_identifier< 10, 6, default_identifier_char_set, identifier_t > identifier
Default identifier type used throughout the project.
Definition: identifier.hpp:346
auto log_error(string_view format) noexcept
Create a log message entry for error, with specified format.
Definition: logger.hpp:317
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).