OGLplus  (0.59.0) a C++ wrapper for rendering APIs

skeleton.hpp
Go to the documentation of this file.
1 
9 #ifndef EAGINE_MESSAGE_BUS_SKELETON_HPP
10 #define EAGINE_MESSAGE_BUS_SKELETON_HPP
11 
12 #include "../callable_ref.hpp"
13 #include "../flat_map.hpp"
14 #include "../serialize/block_sink.hpp"
15 #include "../serialize/block_source.hpp"
16 #include "../workshop.hpp"
17 #include "endpoint.hpp"
18 #include "serialize.hpp"
19 #include <array>
20 #include <map>
21 #include <tuple>
22 
23 namespace eagine::msgbus {
24 //------------------------------------------------------------------------------
25 template <
26  typename Signature,
27  typename Serializer,
28  typename Deserializer,
29  typename Sink,
30  typename Source,
31  std::size_t MaxDataSize>
32 class skeleton {
33 public:
34  auto call(
35  const message_context& msg_ctx,
36  const stored_message& request,
37  message_id response_id,
38  memory::block buffer,
39  callable_ref<Signature> func) -> bool {
40  return _do_call(
41  msg_ctx, request, response_id, buffer, func, func.argument_tuple());
42  }
43 
44  auto call(
45  const message_context& msg_ctx,
46  const stored_message& request,
47  message_id response_id,
48  callable_ref<Signature> func) -> bool {
49  std::array<byte, MaxDataSize> buffer{};
50  return call(msg_ctx, request, response_id, cover(buffer), func);
51  }
52 
53 private:
54  template <typename... Params>
55  auto _do_call(
56  const message_context& msg_ctx,
57  const stored_message& request,
58  message_id response_id,
59  memory::block buffer,
60  callable_ref<Signature> func,
61  std::tuple<Params...> args) -> bool {
62 
63  _source.reset(request.content());
64  Deserializer read_backend(_source);
65 
66  if(request.has_serializer_id(read_backend.type_id())) {
67  const auto read_errors = deserialize(args, read_backend);
68  if(!read_errors) {
69  const auto result{std::apply(func, args)};
70  _sink.reset(buffer);
71  Serializer write_backend(_sink);
72 
73  const auto errors = serialize(result, write_backend);
74  EAGINE_ASSERT(!errors);
75  EAGINE_MAYBE_UNUSED(errors);
76  message_view msg_out{_sink.done()};
77  msg_out.set_serializer_id(write_backend.type_id());
78  msg_ctx.bus().respond_to(request, response_id, msg_out);
79  return true;
80  }
81  }
82  return false;
83  }
84 
85  auto _do_call(
86  const message_context& msg_ctx,
87  const stored_message& request,
88  message_id response_id,
89  memory::block buffer,
90  callable_ref<Signature> func,
91  std::tuple<>) -> bool {
92 
93  const auto result{func()};
94  _sink.reset(buffer);
95  Serializer write_backend(_sink);
96 
97  const auto errors = serialize(result, write_backend);
98  EAGINE_ASSERT(!errors);
99  EAGINE_MAYBE_UNUSED(errors);
100  message_view msg_out{_sink.done()};
101  msg_out.set_serializer_id(write_backend.type_id());
102  msg_ctx.bus().respond_to(request, response_id, msg_out);
103  return true;
104  }
105 
106 private:
107  Source _source{};
108  Sink _sink{};
109 };
110 //------------------------------------------------------------------------------
111 template <
112  typename Signature,
113  typename Serializer,
114  typename Deserializer,
115  typename Sink,
116  typename Source,
117  std::size_t MaxDataSize>
118 class function_skeleton
119  : public skeleton<Signature, Serializer, Deserializer, Sink, Source, MaxDataSize> {
120 
121  using _function_t = callable_ref<Signature>;
122 
123 public:
124  function_skeleton() noexcept = default;
125 
126  function_skeleton(
127  message_id response_id,
128  callable_ref<Signature> function) noexcept
129  : _response_id{std::move(response_id)}
130  , _function{std::move(function)} {}
131 
132  template <typename Class, typename MfcT, MfcT Mfc>
133  auto operator()(
134  message_id response_id,
135  Class* that,
136  member_function_constant<MfcT, Mfc> func) -> function_skeleton& {
137  _response_id = std::move(response_id);
138  _function = _function_t{that, func};
139  return *this;
140  }
141 
142  template <typename Class, typename MfcT, MfcT Mfc>
143  auto operator()(
144  message_id response_id,
145  const Class* that,
146  member_function_constant<MfcT, Mfc> func) -> function_skeleton& {
147  _response_id = std::move(response_id);
148  _function = _function_t{that, func};
149  return *this;
150  }
151 
152  auto invoke_by(const message_context& msg_ctx, stored_message& request)
153  -> bool {
154  return this->call(msg_ctx.bus(), request, _response_id, _function);
155  }
156 
157  constexpr auto map_invoke_by(message_id msg_id) noexcept {
158  return std::tuple<
159  function_skeleton*,
160  message_handler_map<EAGINE_MEM_FUNC_T(function_skeleton, invoke_by)>>(
161  this, msg_id);
162  }
163 
164  constexpr auto operator[](message_id msg_id) noexcept {
165  return map_invoke_by(msg_id);
166  }
167 
168 private:
169  message_id _response_id{};
170  _function_t _function{};
171 };
172 //------------------------------------------------------------------------------
173 template <
174  typename Signature,
175  typename Serializer,
176  typename Deserializer,
177  typename Sink,
178  typename Source,
179  std::size_t MaxDataSize>
180 class lazy_skeleton {
181  using argument_tuple_type =
182  typename callable_ref<Signature>::argument_tuple_type;
183 
184 public:
185  using id_t = message_sequence_t;
186 
187  lazy_skeleton() noexcept = default;
188 
189  template <typename R, typename P>
190  lazy_skeleton(std::chrono::duration<R, P> default_timeout) noexcept
191  : _default_timeout{default_timeout} {}
192 
193  auto enqueue(
194  const stored_message& request,
195  message_id response_id,
196  callable_ref<Signature> func) -> bool {
197  auto [pos, emplaced] = _pending.try_emplace(request.sequence_no);
198 
199  if(emplaced) {
200  if constexpr(std::tuple_size_v<argument_tuple_type> > 0) {
201  _source.reset(request.content());
202  Deserializer read_backend(_source);
203 
204  if(request.has_serializer_id(read_backend.type_id())) {
205  auto& call = pos->second;
206  const auto read_errors =
207  deserialize(call.args, read_backend);
208  if(!read_errors) {
209  call.too_late.reset(_default_timeout);
210  call.response_id = response_id;
211  call.invoker_id = request.source_id;
212  call.func = func;
213  return true;
214  }
215  }
216  _pending.erase(pos);
217  } else {
218  auto& call = pos->second;
219  call.too_late.reset(_default_timeout);
220  call.response_id = response_id;
221  call.invoker_id = request.source_id;
222  call.func = func;
223  return true;
224  }
225  }
226  return false;
227  }
228 
229  auto handle_one(endpoint& bus, memory::block buffer) -> bool {
230  const auto bgn = _pending.begin();
231  auto pos = bgn;
232  while(pos != _pending.end()) {
233  const auto invocation_id = pos->first;
234  const auto& call = pos->second;
235  ++pos;
236  if(!call.too_late) {
237  const auto result{std::apply(call.func, call.args)};
238  _sink.reset(cover(buffer));
239  Serializer write_backend(_sink);
240 
241  const auto errors = serialize(result, write_backend);
242  EAGINE_ASSERT(!errors);
243  EAGINE_MAYBE_UNUSED(errors);
244  message_view msg_out{_sink.done()};
245  msg_out.set_serializer_id(write_backend.type_id());
246  msg_out.set_target_id(call.invoker_id);
247  msg_out.set_sequence_no(invocation_id);
248  bus.post(call.response_id, msg_out);
249  break;
250  }
251  }
252  if(bgn != pos) {
253  _pending.erase(bgn, pos);
254  return true;
255  }
256  return false;
257  }
258 
259  auto handle_one(endpoint& bus) -> bool {
260  std::array<byte, MaxDataSize> buffer{};
261  return handle_one(bus, cover(buffer));
262  }
263 
264 private:
265  std::chrono::milliseconds _default_timeout{1000};
266  Source _source{};
267  Sink _sink{};
268 
269  struct lazy_call {
270  message_id response_id{};
271  argument_tuple_type args{};
272  callable_ref<Signature> func{};
273  timeout too_late{};
274  identifier_t invoker_id{};
275  };
276 
277  flat_map<id_t, lazy_call> _pending{};
278 };
279 //------------------------------------------------------------------------------
280 template <
281  typename Signature,
282  typename Serializer,
283  typename Deserializer,
284  typename Sink,
285  typename Source,
286  std::size_t MaxDataSize>
287 class async_skeleton {
288  using argument_tuple_type =
289  typename callable_ref<Signature>::argument_tuple_type;
290 
291 public:
292  using id_t = message_sequence_t;
293 
294  async_skeleton() noexcept = default;
295 
296  auto enqueue(
297  const stored_message& request,
298  message_id response_id,
299  callable_ref<Signature> func,
300  workshop& workers) -> bool {
301  auto [pos, emplaced] = _pending.try_emplace(request.sequence_no);
302 
303  if(emplaced) {
304  if constexpr(std::tuple_size_v<argument_tuple_type>) {
305  _source.reset(request.content());
306  Deserializer read_backend(_source);
307 
308  if(request.has_serializer_id(read_backend.type_id())) {
309  auto& call = pos->second;
310  const auto read_errors =
311  deserialize(call.args, read_backend);
312  if(!read_errors) {
313  call.response_id = response_id;
314  call.invoker_id = request.source_id;
315  call.func = func;
316  workers.enqueue(call);
317  return true;
318  }
319  }
320  _pending.erase(pos);
321  } else {
322  auto& call = pos->second;
323  call.response_id = response_id;
324  call.invoker_id = request.source_id;
325  call.func = func;
326  workers.enqueue(call);
327  return true;
328  }
329  }
330  return false;
331  }
332 
333  auto handle_one(endpoint& bus, memory::block buffer) -> bool {
334  const auto bgn = _pending.begin();
335  auto pos = bgn;
336  while(pos != _pending.end()) {
337  const auto invocation_id = pos->first;
338  const auto& call = pos->second;
339  if(call.finished) {
340  _sink.reset(buffer);
341  Serializer write_backend(_sink);
342 
343  const auto errors = serialize(call.result, write_backend);
344  if(!errors) {
345  message_view msg_out{_sink.done()};
346  msg_out.set_serializer_id(write_backend.type_id());
347  msg_out.set_target_id(call.invoker_id);
348  msg_out.set_sequence_no(invocation_id);
349  bus.post(call.response_id, msg_out);
350  }
351  _pending.erase(pos);
352  return true;
353  }
354  ++pos;
355  }
356  return false;
357  }
358 
359  auto handle_one(endpoint& bus) -> bool {
360  std::array<byte, MaxDataSize> buffer{};
361  return handle_one(bus, cover(buffer));
362  }
363 
364 private:
365  Source _source{};
366  Sink _sink{};
367 
368  struct async_call : work_unit {
369  message_id response_id{};
370  argument_tuple_type args{};
371  callable_ref<Signature> func{};
372 
373  using result_type = std::remove_cv_t<
374  std::remove_reference_t<decltype(std::apply(func, args))>>;
375  result_type result{};
376 
377  identifier_t invoker_id{};
378  bool finished{false};
379 
380  auto do_it() -> bool final {
381  result = std::apply(func, args);
382  return true;
383  }
384 
385  void deliver() final {
386  finished = true;
387  }
388  };
389 
390  std::map<id_t, async_call> _pending{};
391 };
392 //------------------------------------------------------------------------------
393 } // namespace eagine::msgbus
394 
395 #endif // EAGINE_MESSAGE_BUS_SKELETON_HPP
static constexpr auto cover(T *addr, S size) noexcept -> span_if_mutable< T >
Creates a span starting at the specified pointer and specified length.
Definition: span.hpp:465
basic_callable_ref< Sig, is_noexcept_function_v< Sig > > callable_ref
Alias for callable object references.
Definition: callable_ref.hpp:191
basic_block< false > block
Alias for non-const byte memory span.
Definition: block.hpp:27
auto deserialize(T &value, Backend &backend) -> std::enable_if_t< std::is_base_of_v< deserializer_backend, Backend >, deserialization_errors >
Deserializes a value with the specified serialization backend.
Definition: read.hpp:475
std::uint32_t message_sequence_t
Alias for message sequence number type.
Definition: types.hpp:22
auto serialize(T &value, Backend &backend) -> std::enable_if_t< std::is_base_of_v< serializer_backend, Backend >, serialization_errors >
Serializes a value with the specified serialization backend.
Definition: write.hpp:480
Message bus code is placed in this namespace.
Definition: eagine.hpp:58
#define EAGINE_MEM_FUNC_T(CLASS, FUNC)
Macro for instantiating the member_function_constant template.
Definition: mem_func_const.hpp:179
std::uint64_t identifier_t
The underlying integer type for eagine::identifier.
Definition: identifier_t.hpp:19
@ message_id
The message type id has been verified.

Copyright © 2015-2021 Matúš Chochlík.
<chochlik -at -gmail.com>
Documentation generated on Tue Apr 13 2021 by Doxygen (version 1.8.17).