Go to the documentation of this file.
9 #ifndef EAGINE_MESSAGE_BUS_DIRECT_HPP
10 #define EAGINE_MESSAGE_BUS_DIRECT_HPP
12 #include "../bool_aggregate.hpp"
13 #include "../branch_predict.hpp"
14 #include "../main_ctx_object.hpp"
15 #include "../value_tracker.hpp"
35 std::unique_lock lock{_c2s_mutex};
41 std::unique_lock lock{_s2c_mutex};
47 std::unique_lock lock{_c2s_mutex};
48 return _client_to_server.
fetch_all(handler);
53 std::unique_lock lock{_s2c_mutex};
54 return _server_to_client.
fetch_all(handler);
57 void log_message_counts() noexcept {
58 if constexpr(is_log_level_enabled_v<log_event_severity::stat>) {
60 std::unique_lock lock{_s2c_mutex};
61 if(_s2c_count.has_changed(_server_to_client.
count())) {
63 EAGINE_ID(s2cMsgCnt),
float(_s2c_count.get()));
68 std::unique_lock lock{_c2s_mutex};
69 if(_c2s_count.has_changed(_client_to_server.
count())) {
71 EAGINE_ID(c2sMsgCnt),
float(_c2s_count.get()));
78 std::mutex _s2c_mutex;
79 std::mutex _c2s_mutex;
80 message_storage _server_to_client;
81 message_storage _client_to_server;
82 value_change_div_tracker<span_size_t, 16> _s2c_count{0};
83 value_change_div_tracker<span_size_t, 16> _c2s_count{0};
109 auto state{std::make_shared<direct_connection_state>(*
this)};
110 _pending.push_back(state);
118 for(
auto& state : _pending) {
123 return something_done;
127 std::vector<shared_state> _pending;
134 template <
typename Base>
160 std::shared_ptr<direct_connection_address>&
address) noexcept
162 , _state{
address->connect()} {}
170 if(EAGINE_LIKELY(_state)) {
171 _state->log_message_counts();
178 if(EAGINE_LIKELY(_state)) {
179 _state->send_to_server(msg_id,
message);
187 if(EAGINE_LIKELY(_state)) {
188 return _state->fetch_from_server(handler);
194 if(EAGINE_LIKELY(_state)) {
195 _state->log_message_counts();
200 std::weak_ptr<direct_connection_address> _weak_address;
201 std::shared_ptr<direct_connection_state> _state;
203 inline void _checkup() {
204 if(EAGINE_UNLIKELY(!_state)) {
205 if(
auto address{_weak_address.lock()}) {
220 : _weak_state{state} {}
223 if(
auto state{_weak_state.lock()}) {
224 state->send_to_client(msg_id,
message);
231 if(
auto state{_weak_state.lock()}) {
232 return state->fetch_from_client(handler);
238 std::weak_ptr<direct_connection_state> _weak_state;
247 using shared_state = std::shared_ptr<direct_connection_state>;
253 std::shared_ptr<direct_connection_address>
address) noexcept
255 , _address{std::move(
address)} {}
260 , _address{std::make_shared<direct_connection_address>(*
this)} {}
265 auto wrapped_handler = [&handler](shared_state& state) {
266 handler(std::unique_ptr<connection>{
267 std::make_unique<direct_server_connection>(state)});
270 _address->process_all({construct_from, wrapped_handler}));
272 return something_done;
278 return std::unique_ptr<connection>{
279 std::make_unique<direct_client_connection>(_address)};
285 std::shared_ptr<direct_connection_address> _address{};
301 , _default_addr{_make_addr()} {}
304 -> std::unique_ptr<acceptor>
final {
306 return std::make_unique<direct_acceptor>(*
this, _get(addr_str));
308 return std::make_unique<direct_acceptor>(*
this, _default_addr);
312 -> std::unique_ptr<connection>
final {
314 return std::make_unique<direct_client_connection>(_get(addr_str));
316 return std::make_unique<direct_client_connection>(_default_addr);
320 std::shared_ptr<direct_connection_address> _default_addr;
323 std::shared_ptr<direct_connection_address>,
327 auto _make_addr() -> std::shared_ptr<direct_connection_address> {
328 return std::make_shared<direct_connection_address>(*
this);
332 -> std::shared_ptr<direct_connection_address>& {
333 auto pos = _addrs.find(addr_str);
334 if(pos == _addrs.end()) {
335 pos = _addrs.emplace(to_string(addr_str), _make_addr()).first;
337 EAGINE_ASSERT(pos != _addrs.end());
344 #endif // EAGINE_MESSAGE_BUS_DIRECT_HPP
auto make_acceptor()
Make a new acceptor listening on a default address.
Definition: conn_factory.hpp:34
Helper class used to initialize main context objects.
Definition: main_ctx_object.hpp:45
Implementation of client-side direct connection.
Definition: direct.hpp:157
auto make_connector(string_view addr_str) -> std::unique_ptr< connection > final
Make a new connector connecting to the specified address.
Definition: direct.hpp:311
void send_to_server(message_id msg_id, const message_view &message)
Sends a message to the server counterpart.
Definition: direct.hpp:34
Declaration of class template storing a reference to a callable object.
Definition: callable_ref.hpp:24
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
std::shared_ptr< direct_connection_state > shared_state
Alias for shared pointer to direct state type.
Definition: direct.hpp:96
Implementation of server-side direct connection.
Definition: direct.hpp:217
Base class for main context objects.
Definition: main_ctx_object.hpp:71
auto fetch_all(fetch_handler handler) -> bool
Fetches all currently stored messages and calls handler on them.
direct_connection_address(main_ctx_parent parent)
Construction from a parent main context object.
Definition: direct.hpp:103
auto fetch_messages(connection::fetch_handler handler) -> bool final
Fetch all enqueued messages that have been received since last fetch.
Definition: direct.hpp:230
void cleanup() final
Cleans up the connection before destroying it.
Definition: direct.hpp:193
Comparator template for standard string - string span comparisons.
Definition: string_span.hpp:165
Implementation of acceptor for direct connections.
Definition: direct.hpp:244
auto update() -> bool final
Updates the internal state of the connection (called repeatedly).
Definition: direct.hpp:169
auto fetch_from_server(connection::fetch_handler handler) noexcept -> bool
Fetches received messages from the service counterpart.
Definition: direct.hpp:52
auto log_chart_sample(identifier series, float value) noexcept -> named_logging_object &
Stores a new value in the specified chart data series.
Definition: logger.hpp:372
@ in_process
In-process connection (cannot be used for inter-process communication).
direct_acceptor(main_ctx_parent parent, std::shared_ptr< direct_connection_address > address) noexcept
Construction from a parent main context object and an address object.
Definition: direct.hpp:251
auto make_acceptor(string_view addr_str) -> std::unique_ptr< acceptor > final
Make a new acceptor listening on the specified address.
Definition: direct.hpp:303
direct_connection_factory(main_ctx_parent parent)
Construction from a parent main context object with implicit address.
Definition: direct.hpp:299
auto process_accepted(const accept_handler &handler) -> bool final
Lets the handler process the pending accepted connections.
Definition: direct.hpp:262
auto make_connector()
Make a new connector connecting to the specified address.
Definition: conn_factory.hpp:40
auto count() const noexcept -> span_size_t
Returns the coung of messages in the storage.
Definition: message.hpp:459
Common shared state for a direct connection.
Definition: direct.hpp:27
auto make_connection() -> std::unique_ptr< connection >
Makes a new client-side direct connection.
Definition: direct.hpp:276
basic_address< false > address
Type alias for non-const memory address values.
Definition: address.hpp:203
Class acting as the "address" of a direct connection.
Definition: direct.hpp:93
Combines message information and a non-owning view to message content.
Definition: message.hpp:288
Class storing initially false value and logically or-ing other values.
Definition: bool_aggregate.hpp:16
void send_to_client(message_id msg_id, const message_view &message)
Sends a message to the client counterpart.
Definition: direct.hpp:40
auto is_usable() -> bool final
Checks if the connection is in usable state.
Definition: direct.hpp:164
auto fetch_messages(connection::fetch_handler handler) -> bool final
Fetch all enqueued messages that have been received since last fetch.
Definition: direct.hpp:185
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
auto process_all(process_handler handler) -> bool
Handles the pending server counterparts for created client connections.
Definition: direct.hpp:116
direct_connection_state(main_ctx_parent parent)
Construction from a parent main context object.
Definition: direct.hpp:30
@ message
Message protocol.
connection_addr_kind
Message bus connection address kind enumeration.
Definition: connection.hpp:30
auto send(message_id msg_id, const message_view &message) -> bool final
Sent a message with the specified id.
Definition: direct.hpp:176
connection_kind
Message bus connection kind bits enumeration.
Definition: connection_kind.hpp:21
Implementation of the connection_info interface for direct connections.
Definition: direct.hpp:135
void push(message_id msg_id, const message_view &message)
Pushes a message into this storage.
Definition: message.hpp:464
Class storing two identifier values representing class/method pair.
Definition: message_id.hpp:25
direct_acceptor(main_ctx_parent parent)
Construction from a parent main context object with implicit address.
Definition: direct.hpp:258
Implementation of connection_factory for direct connections.
Definition: direct.hpp:291
auto connect() -> shared_state
Creates and returns the shared state for a new client connection.
Definition: direct.hpp:108
auto send(message_id msg_id, const message_view &message) -> bool final
Sent a message with the specified id.
Definition: direct.hpp:222
auto fetch_from_client(connection::fetch_handler handler) noexcept -> bool
Fetches received messages from the client counterpart.
Definition: direct.hpp:46