Skip to main content

StrictSync.h File

Strict multi-stream synchronization helpers (pending frame store, token store, release pacer). More...

Included Headers

#include "pipeline/SessionOptions.h" #include "pipeline/Run.h" #include <atomic> #include <condition_variable> #include <cstddef> #include <cstdint> #include <deque> #include <functional> #include <memory> #include <mutex> #include <optional> #include <thread> #include <unordered_map> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespacestrict_sync

Classes Index

classPendingVideoStore
structPendingFrame
structStreamStats
structStreamState
classYoloTokenStore
structToken
structOrderedToken
structStats
structState
classReleasePacer
structStats
structState

Description

Strict multi-stream synchronization helpers (pending frame store, token store, release pacer).

File Listing

The file content with the documentation metadata removed is:

1
7#pragma once
8
10#include "pipeline/Run.h"
11
12#include <atomic>
13#include <condition_variable>
14#include <cstddef>
15#include <cstdint>
16#include <deque>
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <optional>
21#include <thread>
22#include <unordered_map>
23#include <vector>
24
26
28public:
29 struct PendingFrame {
31 int64_t cap_ms = -1;
32 size_t bytes = 0;
33 };
34
35 struct StreamStats {
36 int64_t enqueued = 0;
37 int64_t matched = 0;
38 int64_t miss = 0;
39 size_t pending_depth = 0;
40 size_t pending_bytes = 0;
41 size_t max_pending_depth = 0;
42 size_t max_pending_bytes = 0;
43 };
44
45 explicit PendingVideoStore(size_t streams);
46
47 bool enqueue(size_t idx, int64_t frame_id, simaai::neat::Sample&& sample, int64_t cap_ms,
48 size_t bytes);
49
50 std::optional<PendingFrame> take(size_t idx, int64_t frame_id);
51
52 StreamStats stats(size_t idx) const;
53
54private:
55 struct StreamState {
56 mutable std::mutex mu;
57 std::unordered_map<int64_t, PendingFrame> pending;
58 std::deque<int64_t> order;
59 size_t bytes_total = 0;
61 };
62
63 std::vector<StreamState> states_;
64};
65
67public:
68 struct Token {
69 int64_t frame_id = -1;
70 int64_t enqueued_ms = 0;
71 };
72
73 struct OrderedToken {
74 size_t stream_idx = 0;
75 int64_t frame_id = -1;
76 int64_t enqueued_ms = 0;
77 };
78
79 struct Stats {
80 int64_t enqueued = 0;
81 int64_t dequeued = 0;
82 int64_t miss = 0;
83 size_t depth = 0;
84 size_t max_depth = 0;
85 };
86
87 explicit YoloTokenStore(size_t streams);
88
89 void enqueue(size_t idx, int64_t frame_id);
90 std::optional<OrderedToken> take_ordered();
91 std::optional<Token> take(size_t idx);
92 Stats stats(size_t idx) const;
93
94private:
95 struct State {
96 mutable std::mutex mu;
97 std::deque<Token> q;
99 };
100
101 static int64_t now_ms_i64();
102
103 mutable std::mutex order_mu_;
104 std::deque<OrderedToken> order_q_;
105 std::vector<State> states_;
106};
107
109public:
110 struct Stats {
111 int64_t enqueued = 0;
112 int64_t sent = 0;
113 int64_t dropped = 0;
114 int64_t send_fail = 0;
115 int64_t max_queue_depth = 0;
116 };
117
118 using OnSendResult = std::function<void(size_t /*stream_idx*/, bool /*ok*/)>;
119 using OnDrop = std::function<void(size_t /*stream_idx*/, int64_t /*dropped_count*/)>;
120
121 ReleasePacer(const std::vector<std::shared_ptr<simaai::neat::Run>>& runs, int target_fps,
122 size_t max_queue, OnSendResult on_send_result = {}, OnDrop on_drop = {});
124
125 bool enabled() const {
126 return interval_ms_ > 0;
127 }
128 int64_t interval_ms() const {
129 return interval_ms_;
130 }
131 size_t max_queue() const {
132 return max_queue_;
133 }
134
135 bool enqueue(size_t idx, simaai::neat::Sample&& sample);
136 void stop();
137 Stats stats(size_t idx) const;
138
139private:
140 struct State {
141 mutable std::mutex mu;
142 std::condition_variable cv;
143 std::deque<simaai::neat::Sample> queue;
144 std::thread worker;
145 bool stop = false;
146
147 int64_t next_release_ms = -1;
148 int64_t enqueued = 0;
149 int64_t sent = 0;
150 int64_t dropped = 0;
151 int64_t send_fail = 0;
152 int64_t max_queue_depth = 0;
153 };
154
155 static int64_t now_ms_i64();
156 void worker_loop(size_t idx);
157
158 std::vector<std::shared_ptr<simaai::neat::Run>> runs_;
159 std::vector<std::unique_ptr<State>> states_;
160 OnSendResult on_send_result_;
161 OnDrop on_drop_;
162
163 int64_t interval_ms_ = 0;
164 size_t max_queue_ = 0;
165 std::atomic<bool> stopped_{false};
166};
167
168} // namespace simaai::neat::graph::strict_sync

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.1.