/workspaces/astro/sol3-sdk/cpp/sol3/core/message_input.h Source File

Space-ng SDK: /workspaces/astro/sol3-sdk/cpp/sol3/core/message_input.h Source File
Space-ng SDK
message_input.h
Go to the documentation of this file.
1 // Copyright (c) Space-ng, inc. All rights reserved.
2 #pragma once
6 
7 #include <flatbuffers/flatbuffers.h>
8 #include <flatbuffers/verifier.h>
9 #include <spdlog/spdlog.h>
10 
11 #include <chrono>
12 
13 namespace sol3::core {
14 
22 template <typename TMessageSpec>
23 class MessageInput : public AnyMessageInput {
24  static_assert(
26  "TMessageSpec must be sol3::core::MessageSpec<...>");
27 
28  public:
30  using TableType = typename TMessageSpec::TableType;
31  using NativeTableType = typename TableType::NativeTableType;
32 
38  explicit MessageInput(IBufferExchange& exchange, uint32_t component_id)
40  exchange,
41  TMessageSpec::makeMessagePort(component_id),
43  NativeTableType::GetSchemaData(),
44  MessageSchema::EvolutionPolicy::FORWARDS_COMPATIBLE)) {}
45 
49  explicit MessageInput(
50  IBufferExchange& exchange,
52  uint32_t component_id)
54  exchange,
55  std::move(endpoint),
56  TMessageSpec::makeMessagePort(component_id),
58  NativeTableType::GetSchemaData(),
59  MessageSchema::EvolutionPolicy::FORWARDS_COMPATIBLE)) {}
60 
61  MessageInput(MessageInput&& rhs) = delete;
63  MessageInput(MessageInput const&) = delete;
64  MessageInput& operator=(MessageInput const& rhs) = delete;
65  ~MessageInput() = default;
66 
75  uint64_t seq, std::chrono::steady_clock::duration time_out = {}) {
76  return AnyMessageInput::read(seq, time_out);
77  }
78 
85  std::chrono::steady_clock::duration time_out = {}) {
86  return AnyMessageInput::readHead(time_out);
87  }
88 };
89 
90 template <typename TMessageSpec>
93  bool success = false;
94 };
95 
96 template <typename TMessageSpec, typename TPredicate>
99  std::chrono::nanoseconds timeout,
100  TPredicate predicate,
101  std::chrono::nanoseconds poll_interval = std::chrono::milliseconds(10)) {
102  auto start_time = std::chrono::steady_clock::now();
104  result.view = input.readHead();
105  // first predicate test, we want to test regardless if this is a cached read.
106  if (result.view.hasValue()) {
107  result.success = predicate(result.view);
108  }
109  if (result.success) {
110  return result;
111  }
112  // poll head, and only evaluate predicate if the result is cached.
113  do {
114  std::this_thread::sleep_for(poll_interval);
115  result.view = input.readHead();
116  if (result.view.hasValue() && !result.view.cached()) {
117  result.success = predicate(result.view);
118  }
119  } while (!result.success &&
120  (std::chrono::steady_clock::now() - start_time) < timeout);
121 
122  return result;
123 }
124 
125 } // namespace sol3::core
Definition: any_message_input.h:29
AnyMessageView read(uint64_t seq, std::chrono::steady_clock::duration time_out={})
IBufferExchange::Endpoint const & endpoint() const
AnyMessageView readHead(std::chrono::steady_clock::duration time_out={})
Interface for registering, discovering, and disposing shared buffers.
Definition: shmem_buffer.h:112
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:114
Definition: message_input.h:23
MessageInput & operator=(MessageInput &&rhs)=delete
typename TableType::NativeTableType NativeTableType
Definition: message_input.h:31
MessageInput(MessageInput const &)=delete
MessageView< TMessageSpec > read(uint64_t seq, std::chrono::steady_clock::duration time_out={})
Definition: message_input.h:74
MessageInput(MessageInput &&rhs)=delete
MessageInput(IBufferExchange &exchange, uint32_t component_id)
Definition: message_input.h:38
MessageView< TMessageSpec > readHead(std::chrono::steady_clock::duration time_out={})
Definition: message_input.h:84
typename TMessageSpec::TableType TableType
FlatBuffers table types derived from the message spec.
Definition: message_input.h:30
MessageInput & operator=(MessageInput const &rhs)=delete
MessageInput(IBufferExchange &exchange, IBufferExchange::Endpoint endpoint, uint32_t component_id)
Definition: message_input.h:49
Definition: message_schema.h:12
Definition: message_view.h:14
Definition: any_message_input.h:15
MessageViewSuccess< TMessageSpec > waitUntil(MessageInput< TMessageSpec > &input, std::chrono::nanoseconds timeout, TPredicate predicate, std::chrono::nanoseconds poll_interval=std::chrono::milliseconds(10))
Definition: message_input.h:97
Definition: shmem_buffer.h:219
Definition: message_input.h:91
MessageView< TMessageSpec > view
Definition: message_input.h:92
bool success
Definition: message_input.h:93
Definition: message_spec.h:44