9 #ifndef EAGINE_MESSAGE_BUS_ASIO_HPP
10 #define EAGINE_MESSAGE_BUS_ASIO_HPP
12 #include "../bool_aggregate.hpp"
13 #include "../branch_predict.hpp"
14 #include "../config/platform.hpp"
15 #include "../flat_map.hpp"
16 #include "../logging/type/exception.hpp"
17 #include "../main_ctx_object.hpp"
18 #include "../maybe_unused.hpp"
19 #include "../serialize/size_and_data.hpp"
20 #include "../timeout.hpp"
21 #include "../value_tracker.hpp"
29 EAGINE_DIAG_OFF(disabled-macro-expansion)
30 EAGINE_DIAG_OFF(covered-switch-default)
31 EAGINE_DIAG_OFF(
zero-as-null-pointer-constant)
32 EAGINE_DIAG_OFF(shorten-64-to-32)
33 EAGINE_DIAG_OFF(deprecated)
36 #include <asio/connect.hpp>
37 #include <asio/io_context.hpp>
38 #include <asio/ip/tcp.hpp>
39 #include <asio/ip/udp.hpp>
41 #include <asio/local/stream_protocol.hpp>
43 #include <asio/read.hpp>
44 #include <asio/write.hpp>
52 template <connection_addr_kind, connection_protocol>
55 template <connection_addr_kind Kind, connection_protocol Proto>
56 using asio_socket_type =
typename asio_types<Kind, Proto>::socket_type;
58 template <connection_addr_kind Kind, connection_protocol Proto>
59 using asio_endpoint_type =
60 typename asio_socket_type<Kind, Proto>::endpoint_type;
62 template <
typename Base, connection_addr_kind, connection_protocol>
63 class asio_connection_info;
65 template <connection_addr_kind, connection_protocol>
66 class asio_connection;
68 template <connection_addr_kind, connection_protocol>
71 template <connection_addr_kind, connection_protocol>
74 template <
typename Socket>
75 class asio_flushing_sockets {
77 void adopt(Socket& sckt) {
78 _waiting.emplace_back(std::chrono::seconds(10), std::move(sckt));
81 auto empty() const noexcept ->
bool {
82 return _waiting.empty();
85 void update() noexcept {
90 [](
auto& waiting) { return bool(std::get<0>(waiting)); }),
95 std::vector<std::tuple<timeout, Socket>> _waiting;
98 struct asio_common_state {
99 asio::io_context context;
101 asio_common_state() =
default;
102 asio_common_state(asio_common_state&&) =
delete;
103 asio_common_state(
const asio_common_state&) =
delete;
104 auto operator=(asio_common_state&&) =
delete;
105 auto operator=(
const asio_common_state&) =
delete;
107 ~asio_common_state() noexcept {
108 while(has_flushing()) {
110 std::this_thread::yield();
114 template <
typename Socket>
115 void adopt_flushing(Socket& sckt) {
116 std::get<asio_flushing_sockets<Socket>>(_flushing).adopt(sckt);
119 void update() noexcept {
120 _update_flushing(_flushing);
123 auto has_flushing() const noexcept ->
bool {
124 return _has_flushing(_flushing);
128 template <
typename Tup, std::size_t... I>
130 _do_update_flushing(Tup& flushing, std::index_sequence<I...>) noexcept {
131 (..., std::get<I>(flushing).update());
134 template <
typename Tup>
135 static void _update_flushing(Tup& flushing) noexcept {
137 flushing, std::make_index_sequence<std::tuple_size_v<Tup>>());
140 template <
typename Tup, std::size_t... I>
142 _does_have_flushing(Tup& flushing, std::index_sequence<I...>) noexcept
144 return (
false || ... || !std::get<I>(flushing).empty());
147 template <
typename Tup>
148 static auto _has_flushing(Tup& flushing) noexcept ->
bool {
149 return _does_have_flushing(
150 flushing, std::make_index_sequence<std::tuple_size_v<Tup>>());
155 asio_flushing_sockets<asio::local::stream_protocol::socket>,
157 asio_flushing_sockets<asio::ip::tcp::socket>,
158 asio_flushing_sockets<asio::ip::udp::socket>>
162 template <connection_addr_kind Kind, connection_protocol Proto>
163 struct asio_connection_group : interface<asio_connection_group<Kind, Proto>> {
165 using endpoint_type = asio_endpoint_type<Kind, Proto>;
168 -> message_pack_info = 0;
171 on_sent(
const endpoint_type&,
const message_pack_info& to_be_removed) = 0;
175 virtual auto has_received() ->
bool = 0;
178 template <connection_addr_kind Kind, connection_protocol Proto>
179 struct asio_connection_state
180 : std::enable_shared_from_this<asio_connection_state<Kind, Proto>>
182 using endpoint_type = asio_endpoint_type<Kind, Proto>;
183 using clock_type = std::chrono::steady_clock;
184 using clock_time =
typename clock_type::time_point;
186 std::shared_ptr<asio_common_state> common;
187 asio_socket_type<Kind, Proto> socket;
188 endpoint_type conn_endpoint{};
190 memory::buffer push_buffer{};
191 memory::buffer read_buffer{};
192 memory::buffer write_buffer{};
195 clock_time send_start_time{clock_type::now()};
196 std::int32_t total_sent_messages{0};
197 std::int32_t total_sent_blocks{0};
198 float send_pack_ratio{1.F};
199 const float send_pack_factr{0.5F};
200 bool is_sending{
false};
201 bool is_recving{
false};
203 asio_connection_state(
205 std::shared_ptr<asio_common_state> asio_state,
206 asio_socket_type<Kind, Proto> sock,
210 , common{std::move(asio_state)}
211 , socket{std::move(sock)}
212 , send_pack_factr{pack_factr} {
213 EAGINE_ASSERT(common);
217 push_buffer.resize(block_size);
219 read_buffer.resize(block_size);
221 write_buffer.resize(block_size);
224 log_debug(
"allocating write buffer of ${size}")
226 log_debug(
"allocating read buffer of ${size}")
230 asio_connection_state(
232 const std::shared_ptr<asio_common_state>& asio_state,
234 float pack_factr) noexcept
235 : asio_connection_state{
238 asio_socket_type<Kind, Proto>{asio_state->context},
242 auto weak_ref() noexcept {
243 return std::weak_ptr(this->shared_from_this());
246 auto is_usable() const ->
bool {
247 if(EAGINE_LIKELY(common)) {
248 return socket.is_open();
253 auto log_usage_stats(
span_size_t threshold = 0) ->
bool {
254 if(EAGINE_UNLIKELY(total_sent_size >= threshold)) {
256 1.F - float(total_used_size) / float(total_sent_size);
257 const auto msgs_per_block =
259 ? float(total_sent_messages) / float(total_sent_blocks)
261 const auto used_per_sec =
263 std::chrono::duration<float>(clock_type::now() - send_start_time)
265 const auto sent_per_sec =
267 std::chrono::duration<float>(clock_type::now() - send_start_time)
270 log_stat(
"message slack ratio: ${slack}")
273 .arg(
EAGINE_ID(msgsPerBlk), msgs_per_block)
284 template <
typename Handler>
287 const endpoint_type&,
291 socket, asio::buffer(blk.data(), blk.size()), handler);
294 template <
typename Handler>
297 const endpoint_type& target_endpoint,
300 socket.async_send_to(
301 asio::buffer(blk.data(), blk.size()), target_endpoint, handler);
304 void do_start_send(asio_connection_group<Kind, Proto>& group) {
307 endpoint_type target_endpoint{conn_endpoint};
309 group.pack_into(target_endpoint,
cover(write_buffer));
310 if(!packed.is_empty() && packed.usage() > send_pack_ratio) {
312 send_pack_ratio = 1.F;
313 const auto blk =
view(write_buffer);
322 connection_protocol_tag<Proto>{},
325 [
this, &group, target_endpoint, packed, selfref{weak_ref()}](
326 std::error_code
error, std::size_t length) {
327 EAGINE_MAYBE_UNUSED(length);
328 if(
const auto self{selfref.lock()}) {
330 EAGINE_ASSERT(
span_size(length) == packed.total());
341 total_used_size += packed.used();
342 total_sent_size += packed.total();
343 total_sent_messages += packed.count();
344 total_sent_blocks += 1;
346 if(this->log_usage_stats(
span_size(2U << 27U))) {
349 send_start_time = clock_type::now();
352 this->handle_sent(group, target_endpoint, packed);
354 log_error(
"failed to send data: ${error}")
356 this->is_sending =
false;
357 this->socket.close();
363 send_pack_ratio *= send_pack_factr;
367 auto start_send(asio_connection_group<Kind, Proto>& group) ->
bool {
369 do_start_send(group);
375 asio_connection_group<Kind, Proto>& group,
376 const endpoint_type& target_endpoint,
377 const message_pack_info& to_be_removed) {
378 group.on_sent(target_endpoint, to_be_removed);
379 do_start_send(group);
382 template <
typename Handler>
385 asio::async_read(socket, asio::buffer(blk.data(), blk.size()), handler);
388 template <
typename Handler>
391 socket.async_receive_from(
392 asio::buffer(blk.data(), blk.size()), conn_endpoint, handler);
395 void do_start_receive(asio_connection_group<Kind, Proto>& group) {
396 auto blk =
cover(read_buffer);
398 log_trace(
"receiving data (size: ${size})")
403 connection_protocol_tag<Proto>{},
405 [
this, &group, selfref{weak_ref()}, blk](
406 std::error_code
error, std::size_t length) {
407 if(
const auto self{selfref.lock()}) {
410 log_trace(
"received data (size: ${size})")
414 this->handle_received(rcvd, group);
419 this->handle_received(rcvd, group);
421 if(error == asio::error::eof) {
423 }
else if(error == asio::error::connection_reset) {
426 log_error(
"failed to receive data: ${error}")
430 this->is_recving =
false;
431 this->socket.close();
437 auto start_receive(asio_connection_group<Kind, Proto>& group) ->
bool {
439 do_start_receive(group);
441 return group.has_received();
444 void handle_received(
446 asio_connection_group<Kind, Proto>& group) {
447 group.on_received(conn_endpoint, data);
448 do_start_receive(group);
451 auto update() ->
bool {
452 some_true something_done{};
453 if(
const auto count{common->context.poll()}) {
454 log_trace(
"called ready handlers (count: ${count})")
458 common->context.reset();
460 return something_done;
463 void cleanup(asio_connection_group<Kind, Proto>& group) {
465 while(is_usable() && start_send(group)) {
470 common->adopt_flushing(socket);
476 template <connection_addr_kind Kind, connection_protocol Proto>
477 class asio_connection_base
478 :
public asio_connection_info<connection, Kind, Proto>
479 ,
public main_ctx_object {
481 asio_connection_base(
483 std::shared_ptr<asio_common_state> asio_state,
487 , _state{std::make_shared<asio_connection_state<Kind, Proto>>(
489 std::move(asio_state),
492 EAGINE_ASSERT(_state);
495 asio_connection_base(
497 std::shared_ptr<asio_common_state> asio_state,
498 asio_socket_type<Kind, Proto> socket,
502 , _state{std::make_shared<asio_connection_state<Kind, Proto>>(
504 std::move(asio_state),
508 EAGINE_ASSERT(_state);
511 inline auto conn_state() noexcept -> auto& {
512 EAGINE_ASSERT(_state);
516 auto max_data_size() -> valid_if_positive<span_size_t>
final {
517 return {conn_state().write_buffer.size()};
520 auto is_usable() ->
bool final {
521 return conn_state().is_usable();
525 std::shared_ptr<asio_connection_state<Kind, Proto>> _state;
527 asio_connection_base(
529 std::shared_ptr<asio_connection_state<Kind, Proto>> state)
531 , _state{std::move(state)} {}
534 template <connection_addr_kind Kind, connection_protocol Proto>
535 class asio_connection
536 :
public asio_connection_base<Kind, Proto>
537 ,
public asio_connection_group<Kind, Proto> {
539 using base = asio_connection_base<Kind, Proto>;
540 using endpoint_type = asio_endpoint_type<Kind, Proto>;
544 using base::conn_state;
546 auto update() ->
bool override {
547 some_true something_done{};
548 if(conn_state().socket.is_open()) {
549 something_done(conn_state().start_receive(*
this));
550 something_done(conn_state().start_send(*
this));
552 something_done(conn_state().update());
553 _log_message_counts();
554 return something_done;
558 -> message_pack_info
final {
559 return _outgoing.pack_into(data);
562 void on_sent(
const endpoint_type&,
const message_pack_info& to_be_removed)
564 return _outgoing.cleanup(to_be_removed);
568 return _incoming.push(data);
571 auto has_received() ->
bool final {
572 return !_incoming.empty();
575 auto send(message_id msg_id,
const message_view& message) ->
bool final {
576 return _outgoing.enqueue(
577 *
this, msg_id, message,
cover(conn_state().push_buffer));
581 return _incoming.fetch_messages(*
this, handler);
584 void cleanup() final {
585 this->_log_message_counts();
586 conn_state().cleanup(*
this);
590 void _log_message_counts() noexcept {
591 if constexpr(is_log_level_enabled_v<log_event_severity::stat>) {
592 if(_outgoing_count.has_changed(_outgoing.count())) {
593 this->log_chart_sample(
594 EAGINE_ID(outMsgCnt),
float(_outgoing_count.get()));
597 if(_incoming_count.has_changed(_incoming.count())) {
598 this->log_chart_sample(
599 EAGINE_ID(incMsgCnt),
float(_incoming_count.get()));
605 connection_outgoing_messages _outgoing{};
606 connection_incoming_messages _incoming{};
607 value_change_div_tracker<span_size_t, 16> _outgoing_count{0};
608 value_change_div_tracker<span_size_t, 16> _incoming_count{0};
611 template <connection_addr_kind Kind>
612 class asio_datagram_client_connection
613 :
public asio_connection_base<Kind, connection_protocol::datagram> {
615 using base = asio_connection_base<Kind, connection_protocol::datagram>;
618 using base::conn_state;
620 asio_datagram_client_connection(
622 std::shared_ptr<asio_connection_state<Kind, connection_protocol::datagram>>
624 std::shared_ptr<connection_outgoing_messages> outgoing,
625 std::shared_ptr<connection_incoming_messages> incoming)
626 : base(parent, std::move(state))
627 , _outgoing{std::move(outgoing)}
628 , _incoming{std::move(incoming)} {}
631 EAGINE_ASSERT(_outgoing);
632 return _outgoing->pack_into(data);
635 void on_sent(
const message_pack_info& to_be_removed) {
636 EAGINE_ASSERT(_outgoing);
637 return _outgoing->cleanup(to_be_removed);
641 EAGINE_ASSERT(_incoming);
642 _incoming->push(data);
645 auto send(message_id msg_id,
const message_view& message) ->
bool final {
646 EAGINE_ASSERT(_outgoing);
647 return _outgoing->enqueue(
648 *
this, msg_id, message,
cover(conn_state().push_buffer));
652 EAGINE_ASSERT(_incoming);
653 return _incoming->fetch_messages(*
this, handler);
656 auto update() ->
bool final {
657 some_true something_done{};
658 something_done(conn_state().update());
659 return something_done;
663 std::shared_ptr<connection_outgoing_messages> _outgoing;
664 std::shared_ptr<connection_incoming_messages> _incoming;
667 template <connection_addr_kind Kind>
668 class asio_datagram_server_connection
669 :
public asio_connection_base<Kind, connection_protocol::datagram>
670 ,
public asio_connection_group<Kind, connection_protocol::datagram> {
672 using base = asio_connection_base<Kind, connection_protocol::datagram>;
673 using endpoint_type =
674 asio_endpoint_type<Kind, connection_protocol::datagram>;
678 using base::conn_state;
681 -> message_pack_info
final {
682 EAGINE_ASSERT(_index >= 0);
683 const auto prev_idx{_index};
685 if(_index <
span_size(_current.size())) {
686 auto pos = _current.begin();
687 std::advance(pos, _index);
689 auto& [ep, out_in] = *pos;
690 auto& outgoing = std::get<0>(out_in);
691 EAGINE_ASSERT(outgoing);
692 const auto packed = outgoing->pack_into(dest);
693 if(!packed.is_empty()) {
700 }
while(_index != prev_idx);
705 const endpoint_type& ep,
706 const message_pack_info& to_be_removed)
final {
707 _outgoing(ep).cleanup(to_be_removed);
711 _incoming(ep).push(data);
714 auto has_received() ->
bool final {
715 for(
auto m : {&_current, &_pending}) {
716 for(
const auto& p : *m) {
717 const auto& incoming = std::get<1>(std::get<1>(p));
718 EAGINE_ASSERT(incoming);
719 if(!incoming->empty()) {
727 auto send(message_id,
const message_view&) ->
bool final {
728 EAGINE_UNREACHABLE();
733 EAGINE_UNREACHABLE();
738 some_true something_done;
739 for(
auto& p : _pending) {
740 handler(std::make_unique<asio_datagram_client_connection<Kind>>(
743 std::get<0>(std::get<1>(p)),
744 std::get<1>(std::get<1>(p))));
750 this->
log_debug(
"accepted datagram endpoints")
751 .arg(
EAGINE_ID(current), _current.size());
753 return something_done;
756 auto update() ->
bool final {
757 some_true something_done{};
758 if(conn_state().socket.is_open()) {
759 something_done(conn_state().start_receive(*
this));
760 something_done(conn_state().start_send(*
this));
764 something_done(conn_state().update());
765 return something_done;
768 void cleanup() final {
769 conn_state().cleanup(*
this);
773 auto _get(
const endpoint_type& ep) ->
auto& {
774 auto pos = _current.find(ep);
775 if(pos == _current.end()) {
776 pos = _pending.find(ep);
777 if(pos == _pending.end()) {
781 std::make_shared<connection_outgoing_messages>(),
782 std::make_shared<connection_incoming_messages>())
784 this->
log_debug(
"added pending datagram endpoint")
785 .arg(
EAGINE_ID(pending), _pending.size())
786 .arg(
EAGINE_ID(current), _current.size());
789 return std::get<1>(*pos);
792 auto _outgoing(
const endpoint_type& ep) -> connection_outgoing_messages& {
793 auto& outgoing = std::get<0>(_get(ep));
794 EAGINE_ASSERT(outgoing);
798 auto _incoming(
const endpoint_type& ep) -> connection_incoming_messages& {
799 auto& incoming = std::get<1>(_get(ep));
800 EAGINE_ASSERT(incoming);
807 std::shared_ptr<connection_outgoing_messages>,
808 std::shared_ptr<connection_incoming_messages>>>
809 _current{}, _pending{};
815 template <
typename Base>
816 class asio_connection_info<
836 using socket_type = asio::ip::tcp::socket;
841 :
public asio_connection<
842 connection_addr_kind::ipv4,
843 connection_protocol::stream> {
846 asio_connection<connection_addr_kind::ipv4, connection_protocol::stream>;
847 using base::conn_state;
849 asio::ip::tcp::resolver _resolver;
850 std::tuple<std::string, ipv4_port> _addr;
851 timeout _should_reconnect{std::chrono::seconds{1},
nothing};
852 bool _connecting{
false};
855 _start_connect(asio::ip::tcp::resolver::iterator resolved,
ipv4_port port) {
856 auto& ep = conn_state().conn_endpoint = *resolved;
859 this->log_debug(
"connecting to ${host}:${port}")
863 conn_state().socket.async_connect(
864 ep, [
this, resolved, port](std::error_code error)
mutable {
866 this->log_debug(
"connected on address ${host}:${port}")
871 this->_connecting =
false;
873 if(++resolved != asio::ip::tcp::resolver::iterator{}) {
874 this->_start_connect(resolved, port);
878 "failed to connect on address "
879 "${address}:${port}: "
890 this->_connecting =
false;
896 void _start_resolve() {
898 auto& [host, port] = _addr;
899 _resolver.async_resolve(
902 [
this, port{port}](std::error_code error,
auto resolved) {
904 this->_start_connect(resolved, port);
906 this->log_error(
"failed to resolve address: ${error}")
907 .arg(EAGINE_ID(error), error);
908 this->_connecting = false;
916 const std::shared_ptr<asio_common_state>& asio_state,
920 : base{parent, asio_state, block_size, pack_factr}
921 , _resolver{asio_state->context}
924 auto update() ->
bool final {
925 some_true something_done{};
926 if(conn_state().socket.is_open()) {
927 something_done(conn_state().start_receive(*
this));
928 something_done(conn_state().start_send(*
this));
929 }
else if(!_connecting) {
930 if(_should_reconnect) {
931 _should_reconnect.reset();
936 something_done(conn_state().update());
937 this->_log_message_counts();
938 return something_done;
944 :
public asio_connection_info<
946 connection_addr_kind::ipv4,
947 connection_protocol::stream>
948 ,
public main_ctx_object {
950 std::shared_ptr<asio_common_state> _asio_state;
951 std::tuple<std::string, ipv4_port> _addr;
952 asio::ip::tcp::acceptor _acceptor;
953 asio::ip::tcp::socket _socket;
957 std::vector<asio::ip::tcp::socket> _accepted;
959 void _start_accept() {
960 log_debug(
"accepting connection on address ${host}:${port}")
964 _socket = asio::ip::tcp::socket(this->_asio_state->context);
965 _acceptor.async_accept(_socket, [
this](std::error_code error) {
967 log_debug(
"accepted connection on address ${host}:${port}")
971 this->_accepted.emplace_back(std::move(this->_socket));
974 "failed to accept connection on address "
989 std::shared_ptr<asio_common_state> asio_state,
992 float pack_factr) noexcept
993 : main_ctx_object{
EAGINE_ID(AsioAccptr), parent}
994 , _asio_state{std::move(asio_state)}
996 , _acceptor{_asio_state->context}
997 , _socket{_asio_state->context}
998 , _block_size{block_size}
999 , _pack_factr{pack_factr} {}
1001 auto update() ->
bool final {
1002 EAGINE_ASSERT(this->_asio_state);
1003 some_true something_done{};
1004 if(!_acceptor.is_open()) {
1006 asio::ip::tcp::v4(), std::get<1>(_addr));
1007 _acceptor.open(
endpoint.protocol());
1008 _acceptor.bind(endpoint);
1013 if(this->_asio_state->context.poll()) {
1016 this->_asio_state->context.reset();
1018 return something_done;
1021 auto process_accepted(
const accept_handler& handler) ->
bool final {
1022 some_true something_done{};
1023 for(
auto& socket : _accepted) {
1024 auto conn = std::make_unique<asio_connection<
1025 connection_addr_kind::ipv4,
1026 connection_protocol::stream>>(
1027 *
this, _asio_state, std::move(socket), _block_size, _pack_factr);
1028 handler(std::move(conn));
1032 return something_done;
1038 template <
typename Base>
1039 class asio_connection_info<
1042 connection_protocol::datagram> :
public Base {
1045 return connection_kind::remote_interprocess;
1049 return connection_addr_kind::ipv4;
1059 using socket_type = asio::ip::udp::socket;
1064 :
public asio_connection<
1065 connection_addr_kind::ipv4,
1066 connection_protocol::datagram> {
1069 asio_connection<connection_addr_kind::ipv4, connection_protocol::datagram>;
1070 using base::conn_state;
1072 asio::ip::udp::resolver _resolver;
1073 std::tuple<std::string, ipv4_port> _addr;
1074 timeout _should_reconnect{std::chrono::seconds{1},
nothing};
1075 bool _establishing{
false};
1078 const asio::ip::udp::resolver::iterator& resolved,
1080 auto& ep = conn_state().conn_endpoint = *resolved;
1082 conn_state().socket.open(ep.protocol());
1083 this->_establishing =
false;
1085 this->log_debug(
"resolved address ${host}:${port}")
1090 void _start_resolve() {
1091 _establishing =
true;
1092 const auto& [host, port] = _addr;
1093 _resolver.async_resolve(
1094 host, {}, [
this, port{port}](std::error_code
error,
auto resolved) {
1096 this->_on_resolve(resolved, port);
1098 this->log_error(
"failed to resolve address: ${error}")
1100 this->_establishing =
false;
1108 const std::shared_ptr<asio_common_state>& asio_state,
1112 : base{parent, asio_state, block_size, pack_factr}
1113 , _resolver{asio_state->context}
1116 auto update() ->
bool final {
1117 some_true something_done{};
1118 if(conn_state().socket.is_open()) {
1119 something_done(conn_state().start_receive(*
this));
1120 something_done(conn_state().start_send(*
this));
1121 }
else if(!_establishing) {
1122 if(_should_reconnect) {
1123 _should_reconnect.reset();
1128 something_done(conn_state().update());
1129 this->_log_message_counts();
1130 return something_done;
1136 :
public asio_connection_info<
1138 connection_addr_kind::ipv4,
1139 connection_protocol::datagram>
1140 ,
public main_ctx_object {
1142 std::shared_ptr<asio_common_state> _asio_state;
1143 std::tuple<std::string, ipv4_port> _addr;
1145 asio_datagram_server_connection<connection_addr_kind::ipv4> _conn;
1150 std::shared_ptr<asio_common_state> asio_state,
1153 float pack_factr) noexcept
1154 : main_ctx_object{
EAGINE_ID(AsioAccptr), parent}
1155 , _asio_state{std::move(asio_state)}
1160 asio::ip::udp::socket{
1161 _asio_state->context,
1162 asio::ip::udp::endpoint{asio::ip::udp::v4(), std::get<1>(_addr)}},
1166 auto update() ->
bool final {
1167 return _conn.update();
1170 auto process_accepted(
const accept_handler& handler) ->
bool final {
1171 return _conn.process_accepted(handler);
1178 template <
typename Base>
1179 class asio_connection_info<
1182 connection_protocol::stream> :
public Base {
1185 return connection_kind::local_interprocess;
1189 return connection_addr_kind::filepath;
1199 using socket_type = asio::local::stream_protocol::socket;
1204 :
public asio_connection<
1205 connection_addr_kind::filepath,
1206 connection_protocol::stream> {
1207 using base = asio_connection<
1208 connection_addr_kind::filepath,
1209 connection_protocol::stream>;
1211 std::string _addr_str;
1212 timeout _should_reconnect{std::chrono::seconds{1},
nothing};
1213 bool _connecting{
false};
1215 void _start_connect() {
1217 this->log_debug(
"connecting to ${address}")
1220 conn_state().socket.async_connect(
1221 conn_state().conn_endpoint, [
this](std::error_code error)
mutable {
1223 this->log_debug(
"connected on address ${address}")
1225 _connecting =
false;
1227 this->log_error(
"failed to connect: ${error}")
1229 _connecting =
false;
1234 static inline auto _fix_addr(
string_view addr_str) noexcept {
1235 return addr_str ? addr_str :
string_view{
"/tmp/eagine-msgbus.socket"};
1241 const std::shared_ptr<asio_common_state>& asio_state,
1245 : base{parent, asio_state, block_size, pack_factr}
1246 , _addr_str{_fix_addr(addr_str)} {
1247 conn_state().conn_endpoint = {_addr_str.c_str()};
1250 auto update() ->
bool final {
1251 some_true something_done{};
1252 if(conn_state().socket.is_open()) {
1253 something_done(conn_state().start_receive(*
this));
1254 something_done(conn_state().start_send(*
this));
1255 }
else if(!_connecting) {
1256 if(_should_reconnect) {
1257 _should_reconnect.reset();
1262 something_done(conn_state().update());
1263 this->_log_message_counts();
1264 return something_done;
1270 :
public asio_connection_info<
1272 connection_addr_kind::filepath,
1273 connection_protocol::stream>
1274 ,
public main_ctx_object {
1276 std::shared_ptr<asio_common_state> _asio_state;
1277 std::string _addr_str;
1278 asio::local::stream_protocol::acceptor _acceptor;
1281 bool _accepting{
false};
1283 std::vector<asio::local::stream_protocol::socket> _accepted;
1285 void _start_accept() {
1286 log_debug(
"accepting connection on address ${address}")
1290 _acceptor.async_accept([
this](
1291 std::error_code error,
1292 asio::local::stream_protocol::socket socket) {
1294 this->_accepting =
false;
1297 "failed to accept connection on address ${address}: "
1302 this->log_debug(
"accepted connection on address ${address}")
1304 this->_accepted.emplace_back(std::move(socket));
1310 static inline auto _fix_addr(
string_view addr_str) noexcept {
1311 return addr_str ? addr_str :
string_view{
"/tmp/eagine-msgbus.socket"};
1314 static inline auto _prepare(
1315 std::shared_ptr<asio_common_state> asio_state,
1316 string_view addr_str) -> std::shared_ptr<asio_common_state> {
1317 std::remove(
c_str(addr_str));
1324 std::shared_ptr<asio_common_state> asio_state,
1326 : main_ctx_object{
EAGINE_ID(AsioAccptr), parent}
1327 , _asio_state{_prepare(std::move(asio_state), _fix_addr(addr_str))}
1328 , _addr_str{to_string(_fix_addr(addr_str))}
1330 _asio_state->context,
1331 asio::local::stream_protocol::endpoint(_addr_str.c_str())}
1332 , _block_size{block_size}
1333 , _pack_factr{pack_factr} {
1336 ~asio_acceptor() noexcept
override {
1338 std::remove(_addr_str.c_str());
1343 asio_acceptor(asio_acceptor&&) =
delete;
1344 asio_acceptor(
const asio_acceptor&) =
delete;
1345 auto operator=(asio_acceptor&&) =
delete;
1346 auto operator=(
const asio_acceptor&) =
delete;
1348 auto update() ->
bool final {
1349 EAGINE_ASSERT(this->_asio_state);
1350 some_true something_done{};
1351 if(EAGINE_UNLIKELY(!_acceptor.is_open())) {
1353 _asio_state->context,
1354 asio::local::stream_protocol::endpoint(_addr_str.c_str())};
1357 if(_acceptor.is_open() && !_accepting) {
1361 if(this->_asio_state->context.poll()) {
1364 this->_asio_state->context.reset();
1366 return something_done;
1369 auto process_accepted(
const accept_handler& handler) ->
bool final {
1370 some_true something_done{};
1371 for(
auto& socket : _accepted) {
1372 auto conn = std::make_unique<asio_connection<
1373 connection_addr_kind::filepath,
1374 connection_protocol::stream>>(
1375 *
this, _asio_state, std::move(socket), _block_size, _pack_factr);
1376 handler(std::move(conn));
1380 return something_done;
1384 #endif // EAGINE_POSIX
1388 template <connection_addr_kind Kind, connection_protocol Proto>
1389 class asio_connection_factory
1390 :
public asio_connection_info<connection_factory, Kind, Proto>
1391 ,
public main_ctx_object {
1393 std::shared_ptr<asio_common_state> _asio_state;
1395 template <connection_addr_kind K, connection_protocol P>
1396 static constexpr
auto _default_block_size(
1397 connection_addr_kind_tag<K>,
1398 connection_protocol_tag<P>) noexcept ->
span_size_t {
1402 template <connection_addr_kind K, connection_protocol P>
1403 static constexpr
auto _default_pack_factr(
1404 connection_addr_kind_tag<K>,
1405 connection_protocol_tag<P>) noexcept ->
float {
1406 if(K == connection_addr_kind::filepath) {
1409 if(P == connection_protocol::stream) {
1415 template <connection_addr_kind K>
1416 static constexpr
auto _default_block_size(
1417 connection_addr_kind_tag<K>,
1423 float _pack_factr{0.5F};
1426 using connection_factory::make_acceptor;
1427 using connection_factory::make_connector;
1429 static constexpr
auto default_block_size() noexcept ->
span_size_t {
1430 return _default_block_size(
1431 connection_addr_kind_tag<Kind>{}, connection_protocol_tag<Proto>{});
1434 static constexpr
auto default_pack_factor() noexcept ->
float {
1435 return _default_pack_factr(
1436 connection_addr_kind_tag<Kind>{}, connection_protocol_tag<Proto>{});
1439 asio_connection_factory(
1441 std::shared_ptr<asio_common_state> asio_state,
1443 float pack_factr) noexcept
1444 : main_ctx_object{
EAGINE_ID(AsioConnFc), parent}
1445 , _asio_state{std::move(asio_state)}
1446 , _block_size{block_size}
1447 , _pack_factr{pack_factr} {}
1449 asio_connection_factory(
1453 : asio_connection_factory{
1455 std::make_shared<asio_common_state>(),
1460 : asio_connection_factory{
1462 default_block_size(),
1463 default_pack_factor()} {}
1466 -> std::unique_ptr<acceptor>
final {
1467 return std::make_unique<asio_acceptor<Kind, Proto>>(
1468 *
this, _asio_state, addr_str, _block_size, _pack_factr);
1472 -> std::unique_ptr<connection>
final {
1473 return std::make_unique<asio_connector<Kind, Proto>>(
1474 *
this, _asio_state, addr_str, _block_size, _pack_factr);
1478 using asio_tcp_ipv4_connection_factory = asio_connection_factory<
1479 connection_addr_kind::ipv4,
1480 connection_protocol::stream>;
1482 using asio_udp_ipv4_connection_factory = asio_connection_factory<
1483 connection_addr_kind::ipv4,
1484 connection_protocol::datagram>;
1487 using asio_local_stream_connection_factory = asio_connection_factory<
1488 connection_addr_kind::filepath,
1489 connection_protocol::stream>;
1490 #endif // EAGINE_POSIX
1494 #endif // EAGINE_MESSAGE_BUS_ASIO_HPP