/workspaces/astro/sol3-sdk/cpp/sol3/core/any_message_input.h Source File

Space-ng SDK: /workspaces/astro/sol3-sdk/cpp/sol3/core/any_message_input.h Source File
Space-ng SDK
any_message_input.h
Go to the documentation of this file.
1 // Copyright (c) Space-ng, inc. All rights reserved.
2 #pragma once
7 
8 #include <flatbuffers/flatbuffers.h>
9 #include <spdlog/spdlog.h>
10 
11 #include <chrono>
12 #include <memory>
13 #include <optional>
14 
15 namespace sol3::core {
16 
30  public:
36  explicit AnyMessageInput(
37  IBufferExchange& exchange,
40  MessageSchema expected_schema = MessageSchema());
41 
45  explicit AnyMessageInput(
46  IBufferExchange& exchange,
48  MessageSchema expected_schema);
49 
52  AnyMessageInput(AnyMessageInput const&) = delete;
53  AnyMessageInput& operator=(AnyMessageInput const& rhs) = delete;
54  ~AnyMessageInput() = default;
55 
58 
60  size_t slotCount() const;
61 
63  size_t slotSize() const;
64 
66  void reset();
67 
76  uint64_t seq, std::chrono::steady_clock::duration time_out = {});
77 
83  size_t readInto(
84  uint64_t seq,
85  flatbuffers::span<uint8_t> dest,
86  std::chrono::steady_clock::duration time_out = {});
87 
90  size_t readInto(
91  uint64_t seq,
92  IStreamWriter& stream,
93  std::chrono::steady_clock::duration time_out = {});
94 
100  AnyMessageView readHead(std::chrono::steady_clock::duration time_out = {});
101 
103  uint64_t head() const;
104 
106  MessagePort port() const;
107 
109 
110  MessageSchema const& schema() const;
111 
113 
114  private:
116  std::shared_ptr<IBufferConst> swapBuffer(
117  std::shared_ptr<IBufferConst> buffer);
118 
119  IBufferExchange& exchange_;
120 
121  std::optional<IBufferExchange::Endpoint> endpoint_;
122  MessagePort port_;
123 
124  SeqRingReader reader_;
125  std::vector<AnyMessageView> messages_;
126 
127  std::shared_ptr<IBufferConst> buffer_;
128 
129  MessageSchema schema_;
130 
131  uint64_t uuid_ = 0;
132 };
133 
134 } // namespace sol3::core
Definition: any_message_input.h:29
size_t readInto(uint64_t seq, flatbuffers::span< uint8_t > dest, std::chrono::steady_clock::duration time_out={})
uint64_t head() const
The current latest sequence number, or 0 if no messages.
bool pollForBufferUpdate()
Return true if the underlying buffer has been updated and is valid.
AnyMessageView read(uint64_t seq, std::chrono::steady_clock::duration time_out={})
MessagePort port() const
The message port associated with this input.
AnyMessageInput(AnyMessageInput &&rhs)=delete
MessageSchema const & schema() const
IBufferExchange::Endpoint const & endpoint() const
size_t slotSize() const
Maximum slot size in bytes in the underlying sequence ring.
AnyMessageInput(IBufferExchange &exchange, IBufferExchange::Endpoint endpoint, MessagePort port, MessageSchema expected_schema=MessageSchema())
void reset()
Resets the reader and clears cached message views.
AnyMessageInput & operator=(AnyMessageInput &&rhs)=delete
AnyMessageInput & operator=(AnyMessageInput const &rhs)=delete
size_t readInto(uint64_t seq, IStreamWriter &stream, std::chrono::steady_clock::duration time_out={})
size_t slotCount() const
Number of slots in the underlying sequence ring.
AnyMessageInput(AnyMessageInput const &)=delete
AnyMessageInput(IBufferExchange &exchange, MessagePort port, MessageSchema expected_schema)
AnyMessageView readHead(std::chrono::steady_clock::duration time_out={})
void updateSchema(MessageSchema const &schema)
Definition: any_message_view.h:24
Interface for registering, discovering, and disposing shared buffers.
Definition: shmem_buffer.h:112
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:114
Minimal append-only byte sink for streaming writes.
Definition: stream_writer.h:9
Definition: message_port.h:19
Definition: message_schema.h:12
Definition: seq_ring.h:176
Definition: any_message_input.h:15