Copyright Matus Chochlik. Distributed under the Boost Software License, Version 1.0. See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt
#include <iostream>
#include <queue>
#include <set>
#include <thread>
namespace msgbus {
class fibonacci_server : public actor<3> {
public:
using this_class = fibonacci_server;
using base = actor<3>;
using base::bus;
: base(
main_ctx_object{
EAGINE_ID(FibServer), parent},
this,
}
fibonacci_server(fibonacci_server&& temp) noexcept
: base(
std::move(temp),
this,
fibonacci_server(const fibonacci_server&) = delete;
auto operator=(fibonacci_server&& temp) =
delete;
auto operator=(
const fibonacci_server&) =
delete;
~fibonacci_server() noexcept final = default;
auto shutdown(const message_context&, stored_message&) -> bool {
_done = true;
return true;
}
auto is_ready(const message_context&, stored_message& msg_in) -> bool {
return true;
}
static auto fib(std::int64_t arg) noexcept -> std::int64_t {
return arg <= 2 ? 1 : fib(arg - 2) + fib(arg - 1);
}
auto calculate(const message_context&, stored_message& msg_in) -> bool {
std::int64_t arg{0};
std::int64_t result{0};
auto tup = std::tie(arg, result);
block_data_source source(msg_in.content());
fast_deserializer_backend read_backend(source);
result = fib(arg);
EAGINE_MAYBE_UNUSED(result);
std::array<byte, 64> buffer{};
block_data_sink sink(
cover(buffer));
fast_serializer_backend write_backend(sink);
message_view msg_out{sink.done()};
msg_out.set_serializer_id(write_backend.type_id());
return true;
}
auto is_done() const noexcept {
return _done;
}
private:
bool _done{false};
};
class fibonacci_client : public actor<2> {
public:
using this_class = fibonacci_client;
using base = actor<2>;
using base::bus;
: base(
main_ctx_object{
EAGINE_ID(FibClient), parent},
this,
}
void enqueue(std::int64_t arg) {
_remaining.push(arg);
}
void shutdown() {
}
if(!_remaining.empty()) {
}
}
auto dispatch(const message_context&, stored_message& msg_in) -> bool {
if(!_remaining.empty()) {
auto arg = _remaining.front();
_pending.insert(arg);
_remaining.pop();
std::array<byte, 32> buffer{};
block_data_sink sink(
cover(buffer));
fast_serializer_backend write_backend(sink);
message_view msg_out{sink.done()};
msg_out.set_serializer_id(write_backend.type_id());
}
return true;
}
auto print(const message_context&, stored_message& msg_in) -> bool {
std::int64_t arg{0};
std::int64_t result{0};
auto tup = std::tie(arg, result);
block_data_source source(msg_in.content());
fast_deserializer_backend read_backend(source);
std::cout << "fib(" << arg << ") = " << result << std::endl;
_pending.erase(arg);
return true;
}
auto is_done() const {
return _remaining.empty() && _pending.empty();
}
private:
std::queue<std::int64_t> _remaining{};
std::set<std::int64_t> _pending{};
};
}
auto main(main_ctx& ctx) -> int {
const auto thread_count =
extract_or(ctx.system().cpu_concurrent_threads(), 4);
msgbus::connection_setup conn_setup(ctx);
conn_setup.default_init();
msgbus::fibonacci_client client(ctx, conn_setup);
std::vector<std::thread> workers;
workers.reserve(thread_count);
workers.emplace_back(
[server{msgbus::fibonacci_server(ctx, conn_setup)}]() mutable {
while(!server.is_done()) {
server.process_one();
}
});
}
for(std::int64_t i = 1; i <= n; ++i) {
client.enqueue(i);
}
while(!client.is_done()) {
client.update();
client.process_one();
}
client.shutdown();
for(auto& worker : workers) {
worker.join();
}
return 0;
}
}