Copyright Matus Chochlik. Distributed under the Boost Software License, Version 1.0. See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt
#include <thread>
namespace msgbus {
class ping : public actor<2> {
public:
using base = actor<2>;
ping(
connection_setup& conn_setup,
const valid_if_positive<std::size_t>& max)
: base(
this,
this->allow_subscriptions();
conn_setup.setup_connectors(
*this,
}
auto pong(const message_context&, stored_message&) -> bool {
if(++_rcvd % _lmod == 0) {
.log_info("received ${count} pongs")
}
if(_rcvd < _max) {
_timeout.reset();
}
return true;
}
auto ready(const message_context&, stored_message&) -> bool {
_ready = true;
bus().log_info(
"received pong ready message");
return true;
}
void shutdown() {
bus().log_info(
"sent shutdown message");
}
void update() {
if(_ready && (_sent <= _max * 2) && (_sent < _rcvd + _lmod)) {
if(++_sent % _lmod == 0) {
.log_info("sent ${count} pings")
}
} else {
std::this_thread::yield();
}
}
auto is_done() const noexcept -> bool {
return (_rcvd >= _max) || _timeout;
}
auto pings_per_second(std::chrono::duration<float> s) const noexcept {
return float(_rcvd) / s.count();
}
private:
std::size_t _lmod{1};
std::size_t _sent{0};
std::size_t _rcvd{0};
const std::size_t _max{1000000};
timeout _timeout{std::chrono::seconds(30)};
bool _ready{false};
};
}
auto main(main_ctx& ctx) -> int {
msgbus::router_address
address{ctx};
msgbus::connection_setup conn_setup(ctx);
valid_if_positive<std::size_t> ping_count{};
if(
auto arg{ctx.args().
find(
"--ping-count")}) {
arg.next().parse(ping_count, ctx.log().error_stream());
}
msgbus::ping ping(ctx, conn_setup,
address, ping_count);
const time_measure run_time;
while(!ping.is_done()) {
ping.update();
ping.process_all();
}
const auto elapsed = run_time.seconds();
ctx.log()
.info("execution time ${time}, ${pps} pings per second")
.arg(
EAGINE_ID(pps), ping.pings_per_second(elapsed));
ping.shutdown();
return 0;
}
}