/workspaces/astro/sol3-sdk/cpp/sol3/core/shmem_exchange.h Source File

Space-ng SDK: /workspaces/astro/sol3-sdk/cpp/sol3/core/shmem_exchange.h Source File
Space-ng SDK
shmem_exchange.h
Go to the documentation of this file.
1 // Copyright (c) Space-ng, inc. All rights reserved.
2 
3 #pragma once
5 #include "sol3/core/msg/exchange_config.h"
8 
9 #include <boost/asio.hpp>
10 #include <boost/asio/local/datagram_protocol.hpp>
11 
12 #include <atomic>
13 #include <cstddef>
14 #include <cstdint>
15 #include <map>
16 #include <memory>
17 #include <set>
18 #include <string>
19 #include <unordered_set>
20 #include <utility>
21 #include <vector>
22 namespace sol3::core {
23 
24 enum ExchangeCode : uint32_t {
29 };
30 
36 msg::ExchangeConfigT loadExchangeConfigFromEnv();
37 
40 msg::ExchangeConfigT loadExchangeConfigFromPath(std::string const& config_path);
41 
45  public:
49 
52  ShmemExchange(Endpoint const& local_endpoint);
53 
57  Endpoint const& local_endpoint, msg::ExchangeConfigT exchange_config);
59  ~ShmemExchange() override;
60 
61  // Non-copyable and non-movable: ShmemExchange owns IO and socket resources.
62  ShmemExchange(ShmemExchange const&) = delete;
66 
67  void start() override;
68  void stop() override;
69  bool stopped() const override;
70  void addPeer(Endpoint const& peer_endpoint) override;
72  uint32_t local_port,
73  Endpoint peer_endpoint,
74  uint32_t peer_port = 0) override;
75  void configure(msg::ExchangeConfigT const& config) override;
76  void registerBuffer(IBufferMutable const& buffer) override;
77  void unregisterBuffer(IBufferMutable const& buffer) override;
78  Endpoint const& localEndpoint() const override { return local_endpoint_; }
79  std::shared_ptr<IBufferConst> get(uint32_t port, uint32_t idx) const override;
80  std::shared_ptr<IBufferConst> get(
81  Endpoint const& ep, uint32_t port, uint32_t idx) const override;
82  void dispose(std::shared_ptr<IBufferConst>&& buffer) override;
83  void listPorts(std::vector<EndpointPort>& ports) const override;
85  std::map<uint32_t, std::vector<std::pair<Endpoint, uint32_t>>>& mappings)
86  const override;
88  std::map<PortIdx, std::shared_ptr<IBufferConst>>& buffers) const override;
89 
90  private:
91  struct ExchangePacket {
93  uint64_t peer_session_id;
94  } __attribute__((packed));
95 
96  static size_t constexpr kMaxRecvBuffer = 8192;
97  static size_t constexpr kMaxPayload = sizeof(uint64_t);
98  static size_t constexpr kMaxMissedPollsBeforeEvict = 4;
99  static size_t constexpr kMaxBuffers =
100  (kMaxRecvBuffer - sizeof(ExchangePacket)) / sizeof(BufferInfo);
101  // Process-lifetime peer identity for protocol sessions.
102  // Not a UUID: generated randomly at startup and changes on restart.
103  using PeerSessionId = uint64_t;
104 
105  using Socket = boost::asio::local::datagram_protocol::socket;
106  using ShmemBufferMap = std::map<PortIdx, std::shared_ptr<IBufferConst>>;
107 
108  // State information about downstream peers that are requesting
109  // buffers from this exchange.
110  struct SentState {
111  // buffers that have been registered to share.
112  ShmemBufferMap buffers_to_share;
113 
114  // A list of all peers that we're interacting with.
115  std::vector<Endpoint> peer_endpoints;
116 
117  // Track the last seen peer session id for each polling endpoint.
118  std::map<Endpoint, PeerSessionId> peer_session_ids;
119 
120  // Track which buffers we've sent to each requesting peer endpoint.
121  std::map<Endpoint, std::unordered_set<PortIdx>> sent_buffers_per_peer;
122  };
123 
124  // State information and buffers that are received from other peer endpoints
125  // in response to this exchange's explicit polling.
126  struct ReceivedState {
127  Endpoint local_endpoint;
128  // Track the last seen peer session id for each polled endpoint.
129  // Includes this exchange's endpoint.
130  std::map<Endpoint, PeerSessionId> peer_session_ids;
131 
132  // A list of all peers that we're interacting with.
133  // Includes this exchange's endpoint.
134  std::set<Endpoint> peer_endpoints;
135 
136  // Buffers that have been shared by each peer that we're polling
137  // Note: this will also include buffers that are registered with the
138  // exchange.
139  std::map<Endpoint, ShmemBufferMap> buffers_per_peer;
140 
141  // Track consecutive failed polls for each peer endpoint. Used to evict
142  // stale buffers when a preferred peer dies so fallback peers can take over.
143  std::map<Endpoint, uint32_t> peer_missed_polls;
144 
145  // Resolution mappings:
146  // - local_port != 0: explicit local-port to peer-port mappings.
147  // - local_port == 0: wildcard fallback mappings for any local port.
148  std::map<uint32_t, std::vector<std::pair<Endpoint, uint32_t>>>
149  local_port_to_peer_ports;
150 
152  std::shared_ptr<IBufferConst> buffer;
153  uint64_t generation = 0;
154  };
155 
158  std::shared_ptr<IBufferConst> buffer;
159  bool cache_needs_update = false;
160  uint64_t generation = 0;
161  };
162 
163  // Cache of resolved buffers, invalidated by resolve_generation.
164  std::map<PortIdx, ResolvedCacheEntry> resolved_buffers;
165  // Monotonically increasing counter; bumped whenever the authoritative
166  // buffer state changes, which obsoletes the resolved_buffers cache.
167  uint64_t resolve_generation = 0;
168 
169  // Bump the generation counter, invalidating all resolved_buffers entries.
170  void bumpResolvedGeneration();
171 
172  // Resolve from a mapping bucket in local_port_to_peer_ports. When
173  // mapping_local_port == 0 and mapped peer_port == 0, peer port falls back
174  // to requested_local_port.
175  std::shared_ptr<IBufferConst> resolveFromPortMappings(
176  uint32_t mapping_local_port,
177  uint32_t requested_local_port,
178  uint32_t idx) const;
179 
180  // Walk explicit port mappings, then wildcard fallback mappings
181  // (local_port_to_peer_ports[0]), then finally local endpoint, returning
182  // the first matching buffer for (port, idx).
183  std::shared_ptr<IBufferConst> resolveEntry(PortIdx local_port_idx) const;
184 
185  // Shared-lock lookup for (port, idx). Returns resolved buffer and whether
186  // the caller should schedule a deferred cache update.
187  GetResolvedResult getResolved(uint32_t port, uint32_t idx) const;
188 
189  // Exclusive-lock cache write for deferred updates from getResolved().
190  void updateResolvedCache(GetResolvedResult const& resolved);
191 
192  std::shared_ptr<IBufferConst> get(
193  Endpoint const& ep, uint32_t port, uint32_t idx) const;
194  };
195 
196  template <typename TFunction>
197  auto accessReceivedState(TFunction&& cb) const {
198  return received_state_.access(std::forward<TFunction>(cb));
199  }
200  void run();
201  void registerBuffer(std::shared_ptr<IBufferConst> const& buffer);
202  void unregisterBuffer(std::shared_ptr<IBufferConst> const& buffer);
203  void unregisterBuffer(BufferInfo const& info);
204  void startReceive();
205  void startPeerPoll();
206  void handleReceive();
207  void sendSomeBuffers(Endpoint const& sender_ep);
208  void requestPeerBuffers(Endpoint const& ep);
209  bool sendPacketOnStrand(
210  Endpoint const& ep,
212  uint8_t const* payload = nullptr,
213  size_t payload_size = 0);
214  void sendExchangePacket(
215  Endpoint const& ep,
217  void const* payload = nullptr,
218  size_t payload_size = 0);
219 
220  void handleBufferRequest(
221  Endpoint const& sender_ep,
222  std::array<char, ShmemExchange::kMaxRecvBuffer> const& msgbuf);
223  void handleIncomingBuffers(
224  Endpoint const& sender_ep,
225  std::array<char, ShmemExchange::kMaxRecvBuffer> const& msgbuf,
226  const struct msghdr& msgh);
227  void handleBufferRemoved(
228  Endpoint const& sender_ep,
229  std::array<char, ShmemExchange::kMaxRecvBuffer> const& msgbuf);
230  void handlePeerDisconnect(
231  Endpoint const& sender_ep,
232  std::array<char, ShmemExchange::kMaxRecvBuffer> const& msgbuf);
233  void sendBufferRemoved(Endpoint const& ep, PortIdx port_idx);
234  void sendDisconnect();
235 
236  PeerSessionId peer_session_id_;
237  boost::asio::io_context ctx_;
238  boost::asio::io_context::strand strand_;
239  boost::asio::steady_timer peer_poll_timer_;
240  Endpoint local_endpoint_;
241  Socket socket_;
242  cpp::ThreadSafeValue<ReceivedState> received_state_;
244  std::atomic<bool> stop_requested_{false};
245 
246  std::thread run_thread_;
247 };
248 
249 } // namespace sol3::core
Interface for registering, discovering, and disposing shared buffers.
Definition: shmem_buffer.h:112
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:114
Mutable view of shared memory buffer.
Definition: shmem_buffer.h:90
Definition: shmem_exchange.h:44
void addPeer(Endpoint const &peer_endpoint) override
void start() override
Start running in a background thread.
void unregisterBuffer(IBufferMutable const &buffer) override
Unregister a previously shared mutable buffer.
Endpoint const & localEndpoint() const override
Local endpoint used by this exchange.
Definition: shmem_exchange.h:78
void listPorts(std::vector< EndpointPort > &ports) const override
List all ports this exchange has received from peers or shared locally.
void mapLocalToPeer(uint32_t local_port, Endpoint peer_endpoint, uint32_t peer_port=0) override
void listResolvedBuffers(std::map< PortIdx, std::shared_ptr< IBufferConst >> &buffers) const override
IBufferExchange::Endpoint Endpoint
Definition: shmem_exchange.h:46
ShmemExchange(ShmemExchange const &)=delete
bool stopped() const override
True if the exchange has been stopped.
void registerBuffer(IBufferMutable const &buffer) override
~ShmemExchange() override
Stops and releases socket/IO resources.
ShmemExchange & operator=(ShmemExchange &&)=delete
void stop() override
Request the event loop to stop.
ShmemExchange & operator=(ShmemExchange const &)=delete
ShmemExchange(ShmemExchange &&)=delete
std::shared_ptr< IBufferConst > get(uint32_t port, uint32_t idx) const override
ShmemExchange(Endpoint const &local_endpoint)
void dispose(std::shared_ptr< IBufferConst > &&buffer) override
Release resources associated with a previously retrieved buffer.
std::shared_ptr< IBufferConst > get(Endpoint const &ep, uint32_t port, uint32_t idx) const override
ShmemExchange(Endpoint const &local_endpoint, msg::ExchangeConfigT exchange_config)
void listMappings(std::map< uint32_t, std::vector< std::pair< Endpoint, uint32_t >>> &mappings) const override
void configure(msg::ExchangeConfigT const &config) override
auto access(TFunction &&cb) const
Shared/read-only access; executes cb(TData const&) under a shared_lock.
Definition: thread_safe_value.h:34
Definition: any_message_input.h:15
msg::ExchangeConfigT loadExchangeConfigFromPath(std::string const &config_path)
msg::ExchangeConfigT loadExchangeConfigFromEnv()
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:28
ExchangeCode
Definition: shmem_exchange.h:24
@ buffers_request
Definition: shmem_exchange.h:25
@ buffers_response
Definition: shmem_exchange.h:26
@ peer_disconnect
Definition: shmem_exchange.h:28
@ buffer_removed
Definition: shmem_exchange.h:27
ExchangeCode what
Definition: shmem_exchange.h:0
uint64_t peer_session_id
Definition: shmem_exchange.h:1
Definition: shmem_buffer.h:41
Definition: shmem_buffer.h:62
PortIdx port_idx
Definition: shmem_exchange.h:157
uint64_t generation
Definition: shmem_exchange.h:160
std::shared_ptr< IBufferConst > buffer
Definition: shmem_exchange.h:158
bool cache_needs_update
Definition: shmem_exchange.h:159
std::shared_ptr< IBufferConst > buffer
Definition: shmem_exchange.h:152
uint64_t generation
Definition: shmem_exchange.h:153