Copyright Matus Chochlik. Distributed under the Boost Software License, Version 1.0. See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <thread>
namespace msgbus {
using pong_base = service_composition<pingable<build_info_provider<
system_info_provider<host_info_provider<shutdown_target<>>>>>>;
class pong_example
: public main_ctx_object
, public pong_base {
using base = pong_base;
public:
, base{bus} {}
-> bool final {
if(EAGINE_UNLIKELY((++_sent % _mod) == 0)) {
_log.info(
"sent ${sent} pongs").arg(
EAGINE_ID(sent), _sent);
}
return true;
}
void on_shutdown(
std::chrono::milliseconds age,
log_info(
"received shutdown request from ${source}")
_done = true;
}
auto is_done() const noexcept -> bool {
return _done;
}
some_true something_done{};
if(_sent < 1) {
if(_announce_timeout) {
something_done();
}
}
return something_done;
}
private:
logger _log{};
std::intmax_t _mod{10000};
std::intmax_t _sent{0};
resetting_timeout _announce_timeout{std::chrono::seconds(5)};
bool _done{false};
};
}
auto main(main_ctx& ctx) -> int {
msgbus::registry the_reg{ctx};
valid_if_positive<int> opt_ponger_count{};
if(
auto arg{ctx.args().
find(
"--ponger-count")}) {
arg.next().parse(opt_ponger_count, ctx.log().error_stream());
}
const auto ponger_count = extract_or(opt_ponger_count, 1);
std::atomic<int> still_working(ponger_count);
std::vector<std::thread> workers;
workers.reserve(
std_size(ponger_count));
for(int p = 0; p < ponger_count; ++p) {
auto& bus = the_reg.establish(
EAGINE_ID(PongEndpt));
workers.emplace_back([&still_working, &bus]() {
msgbus::pong_example ponger(bus);
while(!ponger.is_done()) {
ponger.process_all();
if(!ponger.update()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
--still_working;
});
}
while(still_working) {
the_reg.update();
}
for(auto& worker : workers) {
worker.join();
}
return 0;
}
}
auto main(int argc, const char** argv) -> int {
return eagine::main_impl(argc, argv, options);
}
void announce_subscriptions() const
Sends messages to the bus saying which messages this can handle.
Definition: subscriber.hpp:438
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
bitfield< verification_bit > verification_bits
Alias for a bus message verification bitfield.
Definition: verification.hpp:47
Common code is placed in this namespace.
Definition: eagine.hpp:21
auto update() const noexcept -> bool
Updates the internal endpoint state (should be called repeatedly).
Definition: subscriber.hpp:51
@ endpoint
Message bus client endpoint.
main_ctx_object(identifier obj_id, main_ctx_parent parent) noexcept
Initialization from object id and parent.
Definition: main_ctx_object.hpp:77
static auto find(basic_span< T1, P1, S1 > where, basic_span< T2, P2, S2 > what) -> basic_span< T1, P1, S1 >
Finds the position of the last occurrence of what in a span.
Definition: span_algo.hpp:374
auto log_info(string_view format) noexcept
Create a log message entry for information, with specified format.
Definition: logger.hpp:329
static constexpr auto std_size(T v) noexcept
Converts argument to std size type.
Definition: types.hpp:52
std::uint32_t message_sequence_t
Alias for message sequence number type.
Definition: types.hpp:22
@ source_id
The source has been verified.
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
auto bus() noexcept -> auto &
Returns a reference to the associated endpoint.
Definition: subscriber.hpp:38
identifier app_id
The application root logger identifier.
Definition: main_ctx.hpp:30
Structure storing customization options for main context.
Definition: main_ctx.hpp:24