OGLplus  (0.59.0) a C++ wrapper for rendering APIs

eagine/message_bus/007_ping.cpp

Copyright Matus Chochlik. Distributed under the Boost Software License, Version 1.0. See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt

#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstdint>
#include <map>
#include <vector>
namespace eagine {
namespace msgbus {
//------------------------------------------------------------------------------
struct ping_stats {
std::string hostname;
std::chrono::microseconds min_time{std::chrono::microseconds::max()};
std::chrono::microseconds max_time{std::chrono::microseconds::zero()};
std::chrono::microseconds sum_time{std::chrono::microseconds::zero()};
std::chrono::steady_clock::time_point start{
std::chrono::steady_clock::now()};
std::chrono::steady_clock::time_point finish{
std::chrono::steady_clock::now()};
std::intmax_t responded{0};
std::intmax_t timeouted{0};
resetting_timeout should_check_info{std::chrono::seconds(5), nothing};
auto avg_time() const noexcept {
return sum_time / responded;
}
auto time_interval() const noexcept {
return std::chrono::duration_cast<std::chrono::duration<float>>(
finish - start);
}
auto total_count() const noexcept {
return float(responded) + float(timeouted);
}
auto respond_rate() const noexcept {
return math::ratio(float(responded), total_count());
}
auto responds_per_second() const noexcept {
return math::ratio(float(responded), time_interval().count());
}
};
//------------------------------------------------------------------------------
using ping_base = service_composition<
pinger<host_info_consumer<subscriber_discovery<shutdown_invoker<>>>>>;
class ping_example
: public main_ctx_object
, public ping_base {
using base = ping_base;
public:
ping_example(endpoint& bus, const valid_if_positive<std::intmax_t>& max)
: main_ctx_object{EAGINE_ID(PingExampl), bus}
, base{bus}
, _max{extract_or(max, 100000)} {
object_description("Pinger", "Ping example");
}
void is_alive(const subscriber_info&) final {}
void on_subscribed(const subscriber_info& info, message_id sub_msg) final {
if(sub_msg == this->ping_msg_id()) {
if(_targets.try_emplace(info.endpoint_id, ping_stats{}).second) {
log_info("new pingable ${id} appeared")
.arg(EAGINE_ID(id), info.endpoint_id);
}
}
}
void on_unsubscribed(const subscriber_info& info, message_id sub_msg) final {
if(sub_msg == this->ping_msg_id()) {
log_info("pingable ${id} disappeared")
.arg(EAGINE_ID(id), info.endpoint_id);
}
}
void not_subscribed(const subscriber_info& info, message_id sub_msg) final {
if(sub_msg == this->ping_msg_id()) {
log_info("target ${id} is not pingable")
.arg(EAGINE_ID(id), info.endpoint_id);
}
}
void on_host_id_received(
const result_context& res_ctx,
valid_if_positive<host_id_t>&& host_id) final {
if(host_id) {
auto& stats = _targets[res_ctx.source_id()];
stats.host_id = extract(host_id);
}
}
void on_hostname_received(
const result_context& res_ctx,
valid_if_not_empty<std::string>&& hostname) final {
if(hostname) {
auto& stats = _targets[res_ctx.source_id()];
stats.hostname = extract(std::move(hostname));
}
}
void on_ping_response(
identifier_t pinger_id,
std::chrono::microseconds age,
auto& stats = _targets[pinger_id];
stats.responded++;
stats.min_time = std::min(stats.min_time, age);
stats.max_time = std::max(stats.max_time, age);
stats.sum_time += age;
stats.finish = std::chrono::steady_clock::now();
if(EAGINE_UNLIKELY((++_rcvd % _mod) == 0)) {
const auto now{std::chrono::steady_clock::now()};
const std::chrono::duration<float> interval{now - prev_log};
if(EAGINE_LIKELY(interval > decltype(interval)::zero())) {
const auto msgs_per_sec{float(_mod) / interval.count()};
log_chart_sample(EAGINE_ID(msgsPerSec), msgs_per_sec);
log_info("received ${rcvd} pongs")
.arg(EAGINE_ID(rcvd), _rcvd)
.arg(EAGINE_ID(interval), interval)
.arg(EAGINE_ID(msgsPerSec), msgs_per_sec)
.arg(EAGINE_ID(done), EAGINE_ID(Progress), 0, _rcvd, _max);
}
prev_log = now;
}
}
void on_ping_timeout(
identifier_t pinger_id,
std::chrono::microseconds) final {
auto& stats = _targets[pinger_id];
stats.timeouted++;
if(EAGINE_UNLIKELY((++_tout % _mod) == 0)) {
log_info("${tout} pongs timeouted").arg(EAGINE_ID(tout), _tout);
}
}
auto is_done() const noexcept -> bool {
return !(((_rcvd + _tout + _mod) < _max) || this->has_pending_pings());
}
auto update() -> bool {
some_true something_done{};
something_done(base::update());
if(EAGINE_UNLIKELY(_should_query_pingable)) {
log_info("searching for pingables");
query_pingables();
}
if(!_targets.empty()) {
for(auto& [pingable_id, entry] : _targets) {
if(_rcvd < _max) {
const auto lim{
_rcvd +
static_cast<std::intmax_t>(
_mod * (1 + std::log(float(1 + _targets.size()))))};
if(_sent < lim) {
this->ping(pingable_id, std::chrono::seconds(5));
if(EAGINE_UNLIKELY((++_sent % _mod) == 0)) {
log_info("sent ${sent} pings")
.arg(EAGINE_ID(sent), _sent);
}
if(EAGINE_UNLIKELY(entry.should_check_info)) {
if(!entry.host_id) {
this->query_host_id(pingable_id);
}
if(entry.hostname.empty()) {
this->query_hostname(pingable_id);
}
}
something_done();
}
} else {
break;
}
}
}
something_done(base::process_all() > 0);
return something_done;
}
void shutdown() {
for(auto& entry : _targets) {
this->shutdown_one(std::get<0>(entry));
}
}
void log_stats() {
const string_view not_avail{"N/A"};
for(auto& [id, info] : _targets) {
log_stat("pingable ${id} stats:")
.arg(EAGINE_ID(id), id)
.arg(EAGINE_ID(hostId), info.host_id)
.arg(EAGINE_ID(hostname), info.hostname)
.arg(EAGINE_ID(minTime), info.min_time)
.arg(EAGINE_ID(maxTime), info.max_time)
.arg(EAGINE_ID(avgTime), info.avg_time())
.arg(EAGINE_ID(responded), info.responded)
.arg(EAGINE_ID(timeouted), info.timeouted)
.arg(EAGINE_ID(duration), info.time_interval())
.arg(
EAGINE_ID(rspdRate),
EAGINE_ID(Ratio),
info.respond_rate(),
not_avail)
.arg(
EAGINE_ID(rspdPerSec),
EAGINE_ID(RatePerSec),
info.responds_per_second(),
not_avail);
}
}
private:
resetting_timeout _should_query_pingable{std::chrono::seconds(2), nothing};
std::chrono::steady_clock::time_point prev_log{
std::chrono::steady_clock::now()};
std::map<identifier_t, ping_stats> _targets{};
std::intmax_t _mod{5000};
std::intmax_t _max{100000};
std::intmax_t _sent{0};
std::intmax_t _rcvd{0};
std::intmax_t _tout{0};
};
//------------------------------------------------------------------------------
} // namespace msgbus
auto main(main_ctx& ctx) -> int {
ctx.preinitialize();
msgbus::router_address address{ctx};
msgbus::connection_setup conn_setup(ctx);
msgbus::endpoint bus{EAGINE_ID(PingEndpt), ctx};
valid_if_positive<std::intmax_t> ping_count{};
if(auto arg{ctx.args().find("--ping-count")}) {
arg.next().parse(ping_count, ctx.log().error_stream());
}
msgbus::ping_example the_pinger{bus, ping_count};
conn_setup.setup_connectors(the_pinger, address);
resetting_timeout do_chart_stats{std::chrono::seconds(15), nothing};
while(!the_pinger.is_done()) {
the_pinger.process_all();
if(!the_pinger.update()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if(do_chart_stats) {
the_pinger.log_chart_sample(
EAGINE_ID(shortLoad), ctx.system().short_average_load());
the_pinger.log_chart_sample(
EAGINE_ID(longLoad), ctx.system().long_average_load());
if(auto temp_k{ctx.system().cpu_temperature()}) {
the_pinger.log_chart_sample(
EAGINE_ID(cpuTempC),
extract(temp_k).to<units::degree_celsius>());
}
}
}
}
the_pinger.shutdown();
the_pinger.log_stats();
return 0;
}
//------------------------------------------------------------------------------
} // namespace eagine
auto main(int argc, const char** argv) -> int {
options.app_id = EAGINE_ID(PingExe);
return eagine::main_impl(argc, argv, options);
}
basic_string_span< const char > string_view
Alias for const string views.
Definition: string_span.hpp:116
@ host_id
The host identifier has appeared or changed.
#define EAGINE_ID(NAME)
Macro for constructing instances of eagine::identifier.
Definition: identifier.hpp:353
bitfield< verification_bit > verification_bits
Alias for a bus message verification bitfield.
Definition: verification.hpp:47
Common code is placed in this namespace.
Definition: eagine.hpp:21
static constexpr auto extract(api_result_value< Result, api_result_validity::never > &) noexcept -> Result &
Overload of extract for api_result_value.
Definition: c_api_wrap.hpp:270
@ info
Informational log entries.
auto log_chart_sample(identifier series, float value) noexcept -> named_logging_object &
Stores a new value in the specified chart data series.
Definition: logger.hpp:372
auto update() const noexcept -> bool
Updates the internal endpoint state (should be called repeatedly).
Definition: subscriber.hpp:51
@ endpoint
Message bus client endpoint.
main_ctx_object(identifier obj_id, main_ctx_parent parent) noexcept
Initialization from object id and parent.
Definition: main_ctx_object.hpp:77
std::uint32_t host_id_t
Unique host identifier type.
Definition: identifier_t.hpp:25
static auto find(basic_span< T1, P1, S1 > where, basic_span< T2, P2, S2 > what) -> basic_span< T1, P1, S1 >
Finds the position of the last occurrence of what in a span.
Definition: span_algo.hpp:374
auto log_stat(string_view format) noexcept
Create a log message entry for statistic, with specified format.
Definition: logger.hpp:335
auto log_info(string_view format) noexcept
Create a log message entry for information, with specified format.
Definition: logger.hpp:329
basic_address< false > address
Type alias for non-const memory address values.
Definition: address.hpp:203
std::uint32_t message_sequence_t
Alias for message sequence number type.
Definition: types.hpp:22
static constexpr auto ratio(T a, T b) noexcept -> optionally_valid< T >
Returns a divided by b if b is not zero.
Definition: functions.hpp:75
void object_description(string_view display_name, string_view description) noexcept
Sets the human-readable name and description of this object.
Definition: logger.hpp:303
static auto zero(basic_span< T, P, S > spn) -> std::enable_if_t< std::is_integral_v< T >||std::is_floating_point_v< T >, basic_span< T, P, S >>
Fills a span with zero value of type T.
Definition: span_algo.hpp:548
auto process_all() -> span_size_t
Handles (and removes) all poending received messages.
Definition: subscriber.hpp:430
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
auto bus() noexcept -> auto &
Returns a reference to the associated endpoint.
Definition: subscriber.hpp:38
identifier app_id
The application root logger identifier.
Definition: main_ctx.hpp:30
@ message_id
The message type id has been verified.
Structure storing customization options for main context.
Definition: main_ctx.hpp:24
static constexpr nothing_t nothing
Constant of nothing_t type.
Definition: nothing.hpp:30

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).