Skip to main content

JoinBundle.h File

Stage executor that joins multiple inputs into a Bundle sample. More...

Included Headers

#include "graph/StageExecutor.h" #include "graph/nodes/StageNode.h" #include <cstddef> #include <cstdint> #include <deque> #include <memory> #include <string> #include <unordered_map> #include <unordered_set> #include <utility> #include <vector>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespacenodes

Classes Index

structJoinBundleOptions
classJoinBundle
structPending

Description

Stage executor that joins multiple inputs into a Bundle sample.

File Listing

The file content with the documentation metadata removed is:

1
6#pragma once
7
10
11#include <cstddef>
12#include <cstdint>
13#include <deque>
14#include <memory>
15#include <string>
16#include <unordered_map>
17#include <unordered_set>
18#include <utility>
19#include <vector>
20
22
23enum class JoinKeyPolicy {
24 StreamFrame = 0,
26};
27
29 std::vector<std::string> inputs;
30 std::unordered_set<std::string> required; // default: all inputs
32 bool emit_partial = false;
33 std::size_t max_pending_keys = 4096;
34 int timeout_ms = 0; // 0 => no timeout eviction
35};
36
38public:
40
41 void set_ports(const StagePorts& ports) override;
42 void on_input(StageMsg&& msg, std::vector<StageOutMsg>& out) override;
43 void on_tick(std::int64_t now_ns, std::vector<StageOutMsg>& out) override;
44
45private:
46 struct Pending {
47 std::unordered_map<PortId, Sample> samples;
48 std::int64_t last_seen_ns = 0;
49 };
50
51 std::string make_key_(const Sample& sample) const;
52 void touch_key_(const std::string& key);
53 void evict_expired_(std::int64_t now_ns);
54 void evict_oldest_();
55 bool ready_(const Pending& pending) const;
56 void erase_key_(const std::string& key);
57
59 std::vector<std::string> input_names_;
60 std::unordered_map<PortId, std::string> port_names_;
61 std::unordered_map<std::string, PortId> name_to_port_;
62 std::unordered_set<PortId> required_ports_;
63 std::unordered_map<std::string, Pending> pending_;
64 std::deque<std::string> order_;
65 PortId out_port_ = kInvalidPort;
66};
67
68// Convenience: wrap JoinBundle in a StageNode.
69std::shared_ptr<simaai::neat::graph::Node> JoinBundleNode(std::vector<std::string> inputs,
70 std::string label = {},
71 std::string output = "bundle",
72 JoinBundleOptions opt = {});
73
74} // namespace simaai::neat::graph::nodes

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.1.