Skip to main content

BlockingQueue.h File

Simple bounded blocking queue for graph runtime. More...

Included Headers

#include <chrono> #include <condition_variable> #include <cstddef> #include <deque> #include <mutex> #include <utility>

Namespaces Index

namespacesimaai
namespaceneat
namespacegraph
namespaceruntime

Classes Index

classBlockingQueue<T>

Description

Simple bounded blocking queue for graph runtime.

File Listing

The file content with the documentation metadata removed is:

1
6#pragma once
7
8#include <chrono>
9#include <condition_variable>
10#include <cstddef>
11#include <deque>
12#include <mutex>
13#include <utility>
14
16
17template <class T> class BlockingQueue {
18public:
19 explicit BlockingQueue(std::size_t capacity = 0) : capacity_(capacity) {}
20
21 bool push(const T& item, int timeout_ms = -1) {
22 std::unique_lock<std::mutex> lock(mu_);
23 if (closed_)
24 return false;
25 if (capacity_ > 0) {
26 if (timeout_ms < 0) {
27 cv_not_full_.wait(lock, [&] { return closed_ || queue_.size() < capacity_; });
28 } else if (!cv_not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
29 [&] { return closed_ || queue_.size() < capacity_; })) {
30 return false;
31 }
32 if (closed_ || (capacity_ > 0 && queue_.size() >= capacity_))
33 return false;
34 }
35 queue_.push_back(item);
36 cv_not_empty_.notify_one();
37 return true;
38 }
39
40 bool push(T&& item, int timeout_ms = -1) {
41 std::unique_lock<std::mutex> lock(mu_);
42 if (closed_)
43 return false;
44 if (capacity_ > 0) {
45 if (timeout_ms < 0) {
46 cv_not_full_.wait(lock, [&] { return closed_ || queue_.size() < capacity_; });
47 } else if (!cv_not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
48 [&] { return closed_ || queue_.size() < capacity_; })) {
49 return false;
50 }
51 if (closed_ || (capacity_ > 0 && queue_.size() >= capacity_))
52 return false;
53 }
54 queue_.push_back(std::move(item));
55 cv_not_empty_.notify_one();
56 return true;
57 }
58
59 bool try_push(const T& item) {
60 std::lock_guard<std::mutex> lock(mu_);
61 if (closed_)
62 return false;
63 if (capacity_ > 0 && queue_.size() >= capacity_)
64 return false;
65 queue_.push_back(item);
66 cv_not_empty_.notify_one();
67 return true;
68 }
69
70 bool try_push(T&& item) {
71 std::lock_guard<std::mutex> lock(mu_);
72 if (closed_)
73 return false;
74 if (capacity_ > 0 && queue_.size() >= capacity_)
75 return false;
76 queue_.push_back(std::move(item));
77 cv_not_empty_.notify_one();
78 return true;
79 }
80
81 bool pop(T& out, int timeout_ms = -1) {
82 std::unique_lock<std::mutex> lock(mu_);
83 if (timeout_ms < 0) {
84 cv_not_empty_.wait(lock, [&] { return closed_ || !queue_.empty(); });
85 } else {
86 cv_not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
87 [&] { return closed_ || !queue_.empty(); });
88 }
89 if (queue_.empty())
90 return false;
91 out = std::move(queue_.front());
92 queue_.pop_front();
93 cv_not_full_.notify_one();
94 return true;
95 }
96
97 void close() {
98 std::lock_guard<std::mutex> lock(mu_);
99 closed_ = true;
100 cv_not_empty_.notify_all();
101 cv_not_full_.notify_all();
102 }
103
104 bool closed() const {
105 std::lock_guard<std::mutex> lock(mu_);
106 return closed_;
107 }
108
109 std::size_t size() const {
110 std::lock_guard<std::mutex> lock(mu_);
111 return queue_.size();
112 }
113
114private:
115 mutable std::mutex mu_;
116 std::condition_variable cv_not_empty_;
117 std::condition_variable cv_not_full_;
118 std::deque<T> queue_;
119 std::size_t capacity_ = 0;
120 bool closed_ = false;
121};
122
123} // namespace simaai::neat::graph::runtime

Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.1.