5 #include "sol3/core/msg/exchange_config.h"
9 #include <boost/asio.hpp>
10 #include <boost/asio/local/datagram_protocol.hpp>
19 #include <unordered_set>
57 Endpoint const& local_endpoint, msg::ExchangeConfigT exchange_config);
74 uint32_t peer_port = 0)
override;
75 void configure(msg::ExchangeConfigT
const& config)
override;
79 std::shared_ptr<IBufferConst>
get(uint32_t port, uint32_t idx)
const override;
80 std::shared_ptr<IBufferConst>
get(
81 Endpoint const& ep, uint32_t port, uint32_t idx)
const override;
82 void dispose(std::shared_ptr<IBufferConst>&& buffer)
override;
83 void listPorts(std::vector<EndpointPort>& ports)
const override;
85 std::map<uint32_t, std::vector<std::pair<Endpoint, uint32_t>>>& mappings)
88 std::map<
PortIdx, std::shared_ptr<IBufferConst>>& buffers)
const override;
91 struct ExchangePacket {
94 } __attribute__((packed));
96 static size_t constexpr kMaxRecvBuffer = 8192;
97 static size_t constexpr kMaxPayload =
sizeof(uint64_t);
98 static size_t constexpr kMaxMissedPollsBeforeEvict = 4;
99 static size_t constexpr kMaxBuffers =
100 (kMaxRecvBuffer -
sizeof(ExchangePacket)) /
sizeof(BufferInfo);
103 using PeerSessionId = uint64_t;
105 using Socket = boost::asio::local::datagram_protocol::socket;
106 using ShmemBufferMap = std::map<PortIdx, std::shared_ptr<IBufferConst>>;
112 ShmemBufferMap buffers_to_share;
115 std::vector<Endpoint> peer_endpoints;
118 std::map<Endpoint, PeerSessionId> peer_session_ids;
121 std::map<Endpoint, std::unordered_set<PortIdx>> sent_buffers_per_peer;
126 struct ReceivedState {
130 std::map<Endpoint, PeerSessionId> peer_session_ids;
134 std::set<Endpoint> peer_endpoints;
139 std::map<Endpoint, ShmemBufferMap> buffers_per_peer;
143 std::map<Endpoint, uint32_t> peer_missed_polls;
148 std::map<uint32_t, std::vector<std::pair<Endpoint, uint32_t>>>
149 local_port_to_peer_ports;
164 std::map<PortIdx, ResolvedCacheEntry> resolved_buffers;
167 uint64_t resolve_generation = 0;
170 void bumpResolvedGeneration();
175 std::shared_ptr<IBufferConst> resolveFromPortMappings(
176 uint32_t mapping_local_port,
177 uint32_t requested_local_port,
183 std::shared_ptr<IBufferConst> resolveEntry(
PortIdx local_port_idx)
const;
192 std::shared_ptr<IBufferConst>
get(
193 Endpoint const& ep, uint32_t port, uint32_t idx)
const;
196 template <
typename TFunction>
197 auto accessReceivedState(TFunction&& cb)
const {
198 return received_state_.
access(std::forward<TFunction>(cb));
205 void startPeerPoll();
206 void handleReceive();
207 void sendSomeBuffers(
Endpoint const& sender_ep);
208 void requestPeerBuffers(
Endpoint const& ep);
209 bool sendPacketOnStrand(
212 uint8_t
const* payload =
nullptr,
213 size_t payload_size = 0);
214 void sendExchangePacket(
217 void const* payload =
nullptr,
218 size_t payload_size = 0);
220 void handleBufferRequest(
222 std::array<char, ShmemExchange::kMaxRecvBuffer>
const& msgbuf);
223 void handleIncomingBuffers(
225 std::array<char, ShmemExchange::kMaxRecvBuffer>
const& msgbuf,
226 const struct msghdr& msgh);
227 void handleBufferRemoved(
229 std::array<char, ShmemExchange::kMaxRecvBuffer>
const& msgbuf);
230 void handlePeerDisconnect(
232 std::array<char, ShmemExchange::kMaxRecvBuffer>
const& msgbuf);
234 void sendDisconnect();
236 PeerSessionId peer_session_id_;
237 boost::asio::io_context ctx_;
238 boost::asio::io_context::strand strand_;
239 boost::asio::steady_timer peer_poll_timer_;
244 std::atomic<bool> stop_requested_{
false};
246 std::thread run_thread_;
Interface for registering, discovering, and disposing shared buffers.
Definition: shmem_buffer.h:112
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:114
Mutable view of shared memory buffer.
Definition: shmem_buffer.h:90
Definition: shmem_exchange.h:44
void addPeer(Endpoint const &peer_endpoint) override
void start() override
Start running in a background thread.
void unregisterBuffer(IBufferMutable const &buffer) override
Unregister a previously shared mutable buffer.
Endpoint const & localEndpoint() const override
Local endpoint used by this exchange.
Definition: shmem_exchange.h:78
void listPorts(std::vector< EndpointPort > &ports) const override
List all ports this exchange has received from peers or shared locally.
void mapLocalToPeer(uint32_t local_port, Endpoint peer_endpoint, uint32_t peer_port=0) override
void listResolvedBuffers(std::map< PortIdx, std::shared_ptr< IBufferConst >> &buffers) const override
IBufferExchange::Endpoint Endpoint
Definition: shmem_exchange.h:46
ShmemExchange(ShmemExchange const &)=delete
bool stopped() const override
True if the exchange has been stopped.
void registerBuffer(IBufferMutable const &buffer) override
~ShmemExchange() override
Stops and releases socket/IO resources.
ShmemExchange & operator=(ShmemExchange &&)=delete
void stop() override
Request the event loop to stop.
ShmemExchange & operator=(ShmemExchange const &)=delete
ShmemExchange(ShmemExchange &&)=delete
std::shared_ptr< IBufferConst > get(uint32_t port, uint32_t idx) const override
ShmemExchange(Endpoint const &local_endpoint)
void dispose(std::shared_ptr< IBufferConst > &&buffer) override
Release resources associated with a previously retrieved buffer.
std::shared_ptr< IBufferConst > get(Endpoint const &ep, uint32_t port, uint32_t idx) const override
ShmemExchange(Endpoint const &local_endpoint, msg::ExchangeConfigT exchange_config)
void listMappings(std::map< uint32_t, std::vector< std::pair< Endpoint, uint32_t >>> &mappings) const override
void configure(msg::ExchangeConfigT const &config) override
auto access(TFunction &&cb) const
Shared/read-only access; executes cb(TData const&) under a shared_lock.
Definition: thread_safe_value.h:34
Definition: any_message_input.h:15
msg::ExchangeConfigT loadExchangeConfigFromPath(std::string const &config_path)
msg::ExchangeConfigT loadExchangeConfigFromEnv()
boost::asio::local::datagram_protocol::endpoint Endpoint
Definition: shmem_buffer.h:28
ExchangeCode
Definition: shmem_exchange.h:24
@ buffers_request
Definition: shmem_exchange.h:25
@ buffers_response
Definition: shmem_exchange.h:26
@ peer_disconnect
Definition: shmem_exchange.h:28
@ buffer_removed
Definition: shmem_exchange.h:27
ExchangeCode what
Definition: shmem_exchange.h:0
uint64_t peer_session_id
Definition: shmem_exchange.h:1
Definition: shmem_buffer.h:41
Definition: shmem_buffer.h:62
Definition: shmem_exchange.h:156
PortIdx port_idx
Definition: shmem_exchange.h:157
uint64_t generation
Definition: shmem_exchange.h:160
std::shared_ptr< IBufferConst > buffer
Definition: shmem_exchange.h:158
bool cache_needs_update
Definition: shmem_exchange.h:159
Definition: shmem_exchange.h:151
std::shared_ptr< IBufferConst > buffer
Definition: shmem_exchange.h:152
uint64_t generation
Definition: shmem_exchange.h:153