Skip to content

Commit 483911d

Browse files
committed
Update thread_safe_deque.hpp
Use more lockless sync. Fix some thread safety issues.
1 parent ba4c42e commit 483911d

1 file changed

Lines changed: 20 additions & 8 deletions

File tree

include/internal/thread_safe_deque.hpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,25 @@ namespace csv {
2424
ThreadSafeDeque(const ThreadSafeDeque& other) {
2525
this->data = other.data;
2626
this->_notify_size = other._notify_size;
27+
this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
2728
}
2829

2930
ThreadSafeDeque(const std::deque<T>& source) : ThreadSafeDeque() {
3031
this->data = source;
32+
this->_is_empty.store(source.empty(), std::memory_order_release);
3133
}
3234

33-
void clear() noexcept { this->data.clear(); }
35+
void clear() noexcept {
36+
this->data.clear();
37+
this->_is_empty.store(true, std::memory_order_release);
38+
}
3439

3540
bool empty() const noexcept {
36-
return this->data.empty();
41+
return this->_is_empty.load(std::memory_order_acquire);
3742
}
3843

3944
T& front() noexcept {
45+
std::lock_guard<std::mutex> lock{ this->_lock };
4046
return this->data.front();
4147
}
4248

@@ -47,8 +53,9 @@ namespace csv {
4753
void push_back(T&& item) {
4854
std::lock_guard<std::mutex> lock{ this->_lock };
4955
this->data.push_back(std::move(item));
56+
this->_is_empty.store(false, std::memory_order_release);
5057

51-
if (this->size() >= _notify_size) {
58+
if (this->data.size() >= _notify_size) {
5259
this->_cond.notify_all();
5360
}
5461
}
@@ -57,11 +64,15 @@ namespace csv {
5764
std::lock_guard<std::mutex> lock{ this->_lock };
5865
T item = std::move(data.front());
5966
data.pop_front();
67+
68+
// Update empty flag if we just emptied the deque
69+
if (this->data.empty()) {
70+
this->_is_empty.store(true, std::memory_order_release);
71+
}
72+
6073
return item;
6174
}
6275

63-
size_t size() const noexcept { return this->data.size(); }
64-
6576
/** Returns true if a thread is actively pushing items to this deque */
6677
constexpr bool is_waitable() const noexcept { return this->_is_waitable; }
6778

@@ -72,7 +83,7 @@ namespace csv {
7283
}
7384

7485
std::unique_lock<std::mutex> lock{ this->_lock };
75-
this->_cond.wait(lock, [this] { return this->size() >= _notify_size || !this->is_waitable(); });
86+
this->_cond.wait(lock, [this] { return this->data.size() >= _notify_size || !this->is_waitable(); });
7687
lock.unlock();
7788
}
7889

@@ -99,9 +110,10 @@ namespace csv {
99110
}
100111

101112
private:
102-
std::atomic<bool> _is_waitable{ false };
113+
std::atomic<bool> _is_empty{ true }; // Lock-free empty() check
114+
std::atomic<bool> _is_waitable{ false }; // Lock-free is_waitable() check
103115
size_t _notify_size;
104-
std::mutex _lock;
116+
mutable std::mutex _lock;
105117
std::condition_variable _cond;
106118
std::deque<T> data;
107119
};

0 commit comments

Comments
 (0)