Copyright Matus Chochlik. Distributed under the Boost Software License, Version 1.0. See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt
#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstdint>
#include <map>
#include <vector>
namespace msgbus {
struct ping_stats {
std::string hostname;
std::chrono::microseconds min_time{std::chrono::microseconds::max()};
std::chrono::steady_clock::time_point start{
std::chrono::steady_clock::now()};
std::chrono::steady_clock::time_point finish{
std::chrono::steady_clock::now()};
std::intmax_t responded{0};
std::intmax_t timeouted{0};
resetting_timeout should_check_info{std::chrono::seconds(5),
nothing};
auto avg_time() const noexcept {
return sum_time / responded;
}
auto time_interval() const noexcept {
return std::chrono::duration_cast<std::chrono::duration<float>>(
finish - start);
}
auto total_count() const noexcept {
return float(responded) + float(timeouted);
}
auto respond_rate() const noexcept {
}
auto responds_per_second() const noexcept {
return math::ratio(
float(responded), time_interval().count());
}
};
using ping_base = service_composition<
pinger<host_info_consumer<subscriber_discovery<shutdown_invoker<>>>>>;
class ping_example
: public main_ctx_object
, public ping_base {
using base = ping_base;
public:
ping_example(
endpoint&
bus,
const valid_if_positive<std::intmax_t>& max)
, base{bus}
, _max{extract_or(max, 100000)} {
}
void is_alive(const subscriber_info&) final {}
void on_subscribed(
const subscriber_info&
info,
message_id sub_msg)
final {
if(sub_msg == this->ping_msg_id()) {
if(_targets.try_emplace(
info.endpoint_id, ping_stats{}).second) {
}
}
}
void on_unsubscribed(
const subscriber_info&
info,
message_id sub_msg)
final {
if(sub_msg == this->ping_msg_id()) {
}
}
void not_subscribed(
const subscriber_info&
info,
message_id sub_msg)
final {
if(sub_msg == this->ping_msg_id()) {
log_info(
"target ${id} is not pingable")
}
}
void on_host_id_received(
const result_context& res_ctx,
valid_if_positive<host_id_t>&&
host_id)
final {
auto& stats = _targets[res_ctx.source_id()];
}
}
void on_hostname_received(
const result_context& res_ctx,
valid_if_not_empty<std::string>&& hostname) final {
if(hostname) {
auto& stats = _targets[res_ctx.source_id()];
stats.hostname =
extract(std::move(hostname));
}
}
void on_ping_response(
std::chrono::microseconds age,
auto& stats = _targets[pinger_id];
stats.responded++;
stats.min_time = std::min(stats.min_time, age);
stats.max_time = std::max(stats.max_time, age);
stats.sum_time += age;
stats.finish = std::chrono::steady_clock::now();
if(EAGINE_UNLIKELY((++_rcvd % _mod) == 0)) {
const auto now{std::chrono::steady_clock::now()};
const std::chrono::duration<float> interval{now - prev_log};
if(EAGINE_LIKELY(interval > decltype(interval)::
zero())) {
const auto msgs_per_sec{float(_mod) / interval.count()};
}
prev_log = now;
}
}
void on_ping_timeout(
std::chrono::microseconds) final {
auto& stats = _targets[pinger_id];
stats.timeouted++;
if(EAGINE_UNLIKELY((++_tout % _mod) == 0)) {
}
}
auto is_done() const noexcept -> bool {
return !(((_rcvd + _tout + _mod) < _max) || this->has_pending_pings());
}
some_true something_done{};
if(EAGINE_UNLIKELY(_should_query_pingable)) {
query_pingables();
}
if(!_targets.empty()) {
for(auto& [pingable_id, entry] : _targets) {
if(_rcvd < _max) {
const auto lim{
_rcvd +
static_cast<std::intmax_t>(
_mod * (1 + std::log(float(1 + _targets.size()))))};
if(_sent < lim) {
this->ping(pingable_id, std::chrono::seconds(5));
if(EAGINE_UNLIKELY((++_sent % _mod) == 0)) {
}
if(EAGINE_UNLIKELY(entry.should_check_info)) {
if(!entry.host_id) {
this->query_host_id(pingable_id);
}
if(entry.hostname.empty()) {
this->query_hostname(pingable_id);
}
}
something_done();
}
} else {
break;
}
}
}
return something_done;
}
void shutdown() {
for(auto& entry : _targets) {
this->shutdown_one(std::get<0>(entry));
}
}
void log_stats() {
for(
auto& [
id,
info] : _targets) {
.arg(
not_avail)
.arg(
info.responds_per_second(),
not_avail);
}
}
private:
resetting_timeout _should_query_pingable{std::chrono::seconds(2),
nothing};
std::chrono::steady_clock::time_point prev_log{
std::chrono::steady_clock::now()};
std::map<identifier_t, ping_stats> _targets{};
std::intmax_t _mod{5000};
std::intmax_t _max{100000};
std::intmax_t _sent{0};
std::intmax_t _rcvd{0};
std::intmax_t _tout{0};
};
}
auto main(main_ctx& ctx) -> int {
ctx.preinitialize();
msgbus::router_address
address{ctx};
msgbus::connection_setup conn_setup(ctx);
msgbus::endpoint bus{
EAGINE_ID(PingEndpt), ctx};
valid_if_positive<std::intmax_t> ping_count{};
if(
auto arg{ctx.args().
find(
"--ping-count")}) {
arg.next().parse(ping_count, ctx.log().error_stream());
}
msgbus::ping_example the_pinger{bus, ping_count};
conn_setup.setup_connectors(the_pinger,
address);
resetting_timeout do_chart_stats{std::chrono::seconds(15),
nothing};
while(!the_pinger.is_done()) {
the_pinger.process_all();
if(!the_pinger.update()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if(do_chart_stats) {
the_pinger.log_chart_sample(
EAGINE_ID(shortLoad), ctx.system().short_average_load());
the_pinger.log_chart_sample(
EAGINE_ID(longLoad), ctx.system().long_average_load());
if(auto temp_k{ctx.system().cpu_temperature()}) {
the_pinger.log_chart_sample(
extract(temp_k).to<units::degree_celsius>());
}
}
}
}
the_pinger.shutdown();
the_pinger.log_stats();
return 0;
}
}
auto main(int argc, const char** argv) -> int {
return eagine::main_impl(argc, argv, options);
}