9 #ifndef EAGINE_MESSAGE_BUS_SKELETON_HPP
10 #define EAGINE_MESSAGE_BUS_SKELETON_HPP
12 #include "../callable_ref.hpp"
13 #include "../flat_map.hpp"
14 #include "../serialize/block_sink.hpp"
15 #include "../serialize/block_source.hpp"
16 #include "../workshop.hpp"
28 typename Deserializer,
31 std::size_t MaxDataSize>
35 const message_context& msg_ctx,
36 const stored_message& request,
37 message_id response_id,
39 callable_ref<Signature> func) ->
bool {
41 msg_ctx, request, response_id, buffer, func, func.argument_tuple());
45 const message_context& msg_ctx,
46 const stored_message& request,
47 message_id response_id,
48 callable_ref<Signature> func) ->
bool {
49 std::array<byte, MaxDataSize> buffer{};
50 return call(msg_ctx, request, response_id,
cover(buffer), func);
54 template <
typename... Params>
56 const message_context& msg_ctx,
57 const stored_message& request,
58 message_id response_id,
60 callable_ref<Signature> func,
61 std::tuple<Params...> args) ->
bool {
63 _source.reset(request.content());
64 Deserializer read_backend(_source);
66 if(request.has_serializer_id(read_backend.type_id())) {
67 const auto read_errors =
deserialize(args, read_backend);
69 const auto result{std::apply(func, args)};
71 Serializer write_backend(_sink);
73 const auto errors =
serialize(result, write_backend);
74 EAGINE_ASSERT(!errors);
75 EAGINE_MAYBE_UNUSED(errors);
76 message_view msg_out{_sink.done()};
77 msg_out.set_serializer_id(write_backend.type_id());
78 msg_ctx.bus().respond_to(request, response_id, msg_out);
86 const message_context& msg_ctx,
87 const stored_message& request,
88 message_id response_id,
90 callable_ref<Signature> func,
91 std::tuple<>) ->
bool {
93 const auto result{func()};
95 Serializer write_backend(_sink);
97 const auto errors =
serialize(result, write_backend);
98 EAGINE_ASSERT(!errors);
99 EAGINE_MAYBE_UNUSED(errors);
100 message_view msg_out{_sink.done()};
101 msg_out.set_serializer_id(write_backend.type_id());
102 msg_ctx.bus().respond_to(request, response_id, msg_out);
114 typename Deserializer,
117 std::size_t MaxDataSize>
118 class function_skeleton
119 :
public skeleton<Signature, Serializer, Deserializer, Sink, Source, MaxDataSize> {
121 using _function_t = callable_ref<Signature>;
124 function_skeleton() noexcept = default;
127 message_id response_id,
129 : _response_id{std::move(response_id)}
130 , _function{std::move(
function)} {}
132 template <
typename Class,
typename MfcT, MfcT Mfc>
134 message_id response_id,
136 member_function_constant<MfcT, Mfc> func) -> function_skeleton& {
137 _response_id = std::move(response_id);
138 _function = _function_t{that, func};
142 template <
typename Class,
typename MfcT, MfcT Mfc>
144 message_id response_id,
146 member_function_constant<MfcT, Mfc> func) -> function_skeleton& {
147 _response_id = std::move(response_id);
148 _function = _function_t{that, func};
152 auto invoke_by(
const message_context& msg_ctx, stored_message& request)
154 return this->call(msg_ctx.bus(), request, _response_id, _function);
157 constexpr
auto map_invoke_by(message_id msg_id) noexcept {
164 constexpr
auto operator[](message_id msg_id) noexcept {
165 return map_invoke_by(msg_id);
170 _function_t _function{};
176 typename Deserializer,
179 std::size_t MaxDataSize>
180 class lazy_skeleton {
181 using argument_tuple_type =
182 typename callable_ref<Signature>::argument_tuple_type;
187 lazy_skeleton() noexcept = default;
189 template <typename R, typename P>
190 lazy_skeleton(std::chrono::duration<R, P> default_timeout) noexcept
191 : _default_timeout{default_timeout} {}
194 const stored_message& request,
195 message_id response_id,
196 callable_ref<Signature> func) ->
bool {
197 auto [pos, emplaced] = _pending.try_emplace(request.sequence_no);
200 if constexpr(std::tuple_size_v<argument_tuple_type> > 0) {
201 _source.reset(request.content());
202 Deserializer read_backend(_source);
204 if(request.has_serializer_id(read_backend.type_id())) {
205 auto& call = pos->second;
206 const auto read_errors =
209 call.too_late.reset(_default_timeout);
210 call.response_id = response_id;
211 call.invoker_id = request.source_id;
218 auto& call = pos->second;
219 call.too_late.reset(_default_timeout);
220 call.response_id = response_id;
221 call.invoker_id = request.source_id;
229 auto handle_one(endpoint& bus,
memory::block buffer) ->
bool {
230 const auto bgn = _pending.begin();
232 while(pos != _pending.end()) {
233 const auto invocation_id = pos->first;
234 const auto& call = pos->second;
237 const auto result{std::apply(call.func, call.args)};
238 _sink.reset(
cover(buffer));
239 Serializer write_backend(_sink);
241 const auto errors =
serialize(result, write_backend);
242 EAGINE_ASSERT(!errors);
243 EAGINE_MAYBE_UNUSED(errors);
244 message_view msg_out{_sink.done()};
245 msg_out.set_serializer_id(write_backend.type_id());
246 msg_out.set_target_id(call.invoker_id);
247 msg_out.set_sequence_no(invocation_id);
248 bus.post(call.response_id, msg_out);
253 _pending.erase(bgn, pos);
259 auto handle_one(endpoint& bus) ->
bool {
260 std::array<byte, MaxDataSize> buffer{};
261 return handle_one(bus,
cover(buffer));
265 std::chrono::milliseconds _default_timeout{1000};
271 argument_tuple_type args{};
272 callable_ref<Signature> func{};
277 flat_map<id_t, lazy_call> _pending{};
283 typename Deserializer,
286 std::size_t MaxDataSize>
287 class async_skeleton {
288 using argument_tuple_type =
289 typename callable_ref<Signature>::argument_tuple_type;
294 async_skeleton() noexcept = default;
297 const stored_message& request,
298 message_id response_id,
300 workshop& workers) ->
bool {
301 auto [pos, emplaced] = _pending.try_emplace(request.sequence_no);
304 if constexpr(std::tuple_size_v<argument_tuple_type>) {
305 _source.reset(request.content());
306 Deserializer read_backend(_source);
308 if(request.has_serializer_id(read_backend.type_id())) {
309 auto& call = pos->second;
310 const auto read_errors =
313 call.response_id = response_id;
314 call.invoker_id = request.source_id;
316 workers.enqueue(call);
322 auto& call = pos->second;
323 call.response_id = response_id;
324 call.invoker_id = request.source_id;
326 workers.enqueue(call);
333 auto handle_one(endpoint& bus,
memory::block buffer) ->
bool {
334 const auto bgn = _pending.begin();
336 while(pos != _pending.end()) {
337 const auto invocation_id = pos->first;
338 const auto& call = pos->second;
341 Serializer write_backend(_sink);
343 const auto errors =
serialize(call.result, write_backend);
345 message_view msg_out{_sink.done()};
346 msg_out.set_serializer_id(write_backend.type_id());
347 msg_out.set_target_id(call.invoker_id);
348 msg_out.set_sequence_no(invocation_id);
349 bus.post(call.response_id, msg_out);
359 auto handle_one(endpoint& bus) ->
bool {
360 std::array<byte, MaxDataSize> buffer{};
361 return handle_one(bus,
cover(buffer));
368 struct async_call : work_unit {
370 argument_tuple_type args{};
371 callable_ref<Signature> func{};
373 using result_type = std::remove_cv_t<
374 std::remove_reference_t<decltype(std::apply(func, args))>>;
375 result_type result{};
378 bool finished{
false};
380 auto do_it() ->
bool final {
381 result = std::apply(func, args);
385 void deliver() final {
390 std::map<id_t, async_call> _pending{};
395 #endif // EAGINE_MESSAGE_BUS_SKELETON_HPP