Skip to main content

GraphRun.h File

Runtime handle for a compiled hybrid graph. More...

Included Headers

#include "graph/Graph.h" #include "pipeline/Run.h" #include <cstdint> #include <chrono> #include <functional> #include <memory> #include <mutex> #include <optional> #include <string> #include <unordered_map> #include <unordered_set> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph

Classes Index

structGraphRunOptions
structGraphRunStats
structStreamStat
structNodeStat
structSnapshot
structGraphRunPullOptions
classGraphRun
classInput
classOutput
classStallGuard
classPullSession

Description

Runtime handle for a compiled hybrid graph.

File Listing

The file content with the documentation metadata removed is:

1
6#pragma once
7
8#include "graph/Graph.h"
9#include "pipeline/Run.h"
10
11#include <cstdint>
12#include <chrono>
13#include <functional>
14#include <memory>
15#include <mutex>
16#include <optional>
17#include <string>
18#include <unordered_map>
19#include <unordered_set>
20#include <vector>
21
22namespace simaai::neat::graph {
23
25 // Bounded capacity for graph edge/stage/sink queues. 0 means unbounded.
26 std::size_t edge_queue = 256;
27 // Maximum wait to enqueue before failing fast with a backpressure error.
28 int push_timeout_ms = 5000;
29 // Poll timeout for pop/pull loops.
32};
33
35 struct StreamStat {
36 int64_t count = 0;
37 std::chrono::steady_clock::time_point first{};
38 std::chrono::steady_clock::time_point last{};
39 bool initialized = false;
40 };
41
42 struct NodeStat {
43 int64_t total = 0;
44 std::chrono::steady_clock::time_point first{};
45 std::chrono::steady_clock::time_point last{};
46 bool initialized = false;
47 std::unordered_map<std::string, StreamStat> streams;
48 };
49
50 struct Snapshot {
51 NodeId node_id = kInvalidNode;
52 int64_t total = 0;
53 std::chrono::steady_clock::time_point first{};
54 std::chrono::steady_clock::time_point last{};
55 std::unordered_map<std::string, int64_t> counts;
56 std::unordered_map<std::string, std::chrono::steady_clock::time_point> last_seen;
57 };
58
59 void record(NodeId node_id, const Sample& sample);
60 std::vector<Snapshot> snapshot() const;
61 bool empty() const;
62 std::unordered_map<std::string, int64_t>
63 stream_counts(const std::vector<NodeId>& nodes = {}) const;
64 bool has_missing_streams(const std::unordered_set<std::string>& expected,
65 const std::vector<NodeId>& nodes = {}) const;
66 std::string missing_streams_list(const std::unordered_set<std::string>& expected,
67 const std::vector<NodeId>& nodes = {}) const;
68
69private:
70 mutable std::mutex mu_;
71 std::unordered_map<NodeId, NodeStat> nodes_;
72};
73
76 int stall_ms = 0;
77 int timeout_ms = 50;
78 int max_runtime_ms = -1;
79 std::vector<std::string> stream_ids;
80};
81
82class GraphRun {
83public:
84 class Input {
85 public:
86 bool push(const Sample& sample) const;
87
88 private:
89 friend class GraphRun;
90 Input(GraphRun* run, NodeId node, PortId port, bool has_port)
91 : run_(run), node_(node), port_(port), has_port_(has_port) {}
92
93 GraphRun* run_ = nullptr;
94 NodeId node_ = kInvalidNode;
95 PortId port_ = kInvalidPort;
96 bool has_port_ = false;
97 };
98
99 class Output {
100 public:
101 std::optional<Sample> pull(int timeout_ms = -1, GraphRunStats* stats = nullptr) const;
102 Sample pull_or_throw(int timeout_ms = -1, GraphRunStats* stats = nullptr) const;
103 NodeId node_id() const {
104 return node_;
105 }
106
107 private:
108 friend class GraphRun;
109 Output(GraphRun* run, NodeId node) : run_(run), node_(node) {}
110
111 GraphRun* run_ = nullptr;
112 NodeId node_ = kInvalidNode;
113 };
114
115 class StallGuard {
116 public:
118 bool done() const {
119 return done_;
120 }
121 bool stalled() const {
122 return stalled_;
123 }
124 int64_t target_progress() const {
125 return target_progress_;
126 }
127
128 private:
129 friend class GraphRun;
130 StallGuard(std::vector<NodeId> nodes, std::vector<std::string> streams, int per_stream_target,
131 int stall_ms);
132
133 std::vector<NodeId> nodes_;
134 std::vector<std::string> streams_;
135 int per_stream_target_ = 0;
136 int stall_ms_ = 0;
137 bool initialized_ = false;
138 bool done_ = false;
139 bool stalled_ = false;
140 int64_t target_progress_ = 0;
141 std::chrono::steady_clock::time_point last_progress{};
142 };
143
145 public:
150 PullSession& expect_streams(std::vector<std::string> ids);
151 PullSession& on_sample(std::function<void(const Sample&, NodeId)> cb);
153 void run();
154
155 private:
156 friend class GraphRun;
157 PullSession(GraphRun* run, const std::vector<Output>* outputs, std::vector<NodeId> output_nodes,
159
160 GraphRun* run_ = nullptr;
161 const std::vector<Output>* outputs_ = nullptr;
162 std::vector<NodeId> output_nodes_;
163 GraphRunStats* stats_ = nullptr;
165 std::unordered_set<std::string> expected_;
166 std::unordered_set<std::string> unknown_;
167 bool saw_empty_stream_id_ = false;
168 std::function<void(const Sample&, NodeId)> on_sample_;
169 };
170
171 GraphRun() = default;
172 GraphRun(const GraphRun&) = delete;
173 GraphRun& operator=(const GraphRun&) = delete;
174
175 GraphRun(GraphRun&&) noexcept;
176 GraphRun& operator=(GraphRun&&) noexcept;
178
179 explicit operator bool() const noexcept;
180 bool running() const;
181
182 bool push(NodeId node_id, const Sample& sample);
183 bool push(NodeId node_id, PortId port, const Sample& sample);
184
185 std::optional<Sample> pull(NodeId node_id, int timeout_ms = -1);
186
187 Input input(NodeId node_id);
188 Input input(NodeId node_id, PortId port);
190
192 const GraphRunStats* stats() const;
193 PullSession collect(const std::vector<Output>& outputs, GraphRunStats* stats = nullptr);
194
195 std::optional<Sample> pull_any(const std::vector<Output>& outputs, int timeout_ms = -1,
196 GraphRunStats* stats = nullptr, NodeId* out_node = nullptr);
197 bool warmup(const std::vector<Output>& outputs, int warmup_count, int timeout_ms = -1);
198 void pull_until(const std::vector<Output>& outputs, GraphRunStats& stats,
199 const GraphRunPullOptions& opt,
200 const std::function<void(const Sample&, NodeId)>& on_sample = {});
201 StallGuard stall_guard(const std::vector<Output>& outputs, int per_stream_target, int stall_ms,
202 std::vector<std::string> stream_ids = {});
204 void emit_rate_summary() const;
206 void emit_stream_summary() const;
207 void emit_summary(const GraphRunStats& stats) const;
208 void emit_summary() const;
209
210 std::string describe() const;
211
212 void stop();
213 std::string last_error() const;
214 void last_error_or_throw() const;
215
216private:
217 struct State;
218 std::shared_ptr<State> state_;
219
220 explicit GraphRun(std::shared_ptr<State> state);
221 friend class GraphSession;
222};
223
224} // namespace simaai::neat::graph

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.1.