Skip to content

Commit 3596f1a

Browse files
committed
Remove some unnecessary locks in ThreadSafeDeque
1 parent 23f8999 commit 3596f1a

5 files changed

Lines changed: 542 additions & 234 deletions

File tree

include/internal/raw_csv_data.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ namespace csv {
2323
_current_block++;
2424
}
2525

26+
assert(_current_block < _block_capacity);
27+
2628
std::unique_ptr<RawCSVField[]> block(new RawCSVField[_single_buffer_capacity]);
2729
RawCSVField* block_ptr = block.get();
2830
this->_owned_blocks.push_back(std::move(block));

include/internal/raw_csv_data.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#pragma once
1111
#include <atomic>
12+
#include <cassert>
1213
#include <memory>
1314
#include <mutex>
1415
#include <unordered_map>
@@ -108,6 +109,7 @@ namespace csv {
108109
this->allocate();
109110
}
110111

112+
assert(_back != nullptr);
111113
*(_back++) = RawCSVField(std::forward<Args>(args)...);
112114
_current_buffer_size++;
113115
}

include/internal/thread_safe_deque.hpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ namespace csv {
1515
namespace internals {
1616
/** A std::deque wrapper which allows multiple read and write threads to concurrently
1717
* access it along with providing read threads the ability to wait for the deque
18-
* to become populated
18+
* to become populated.
19+
*
20+
* Concurrency strategy: writer-side mutations (push_back/pop_front) are locked;
21+
* hot-path flags (empty/is_waitable) are atomic; operator[] and iterators are
22+
* not synchronized and must not run concurrently with writers.
1923
*/
2024
template<typename T>
2125
class ThreadSafeDeque {
@@ -32,11 +36,6 @@ namespace csv {
3236
this->_is_empty.store(source.empty(), std::memory_order_release);
3337
}
3438

35-
void clear() noexcept {
36-
this->data.clear();
37-
this->_is_empty.store(true, std::memory_order_release);
38-
}
39-
4039
bool empty() const noexcept {
4140
return this->_is_empty.load(std::memory_order_acquire);
4241
}
@@ -46,6 +45,11 @@ namespace csv {
4645
return this->data.front();
4746
}
4847

48+
/** NOTE: operator[] is not synchronized.
49+
* Only call when no concurrent push_back/pop_front can occur.
50+
* std::deque can reallocate its internal map on push_back, which
51+
* makes concurrent operator[] access undefined behavior.
52+
*/
4953
T& operator[](size_t n) {
5054
return this->data[n];
5155
}
@@ -102,15 +106,13 @@ namespace csv {
102106

103107
/** Tell listeners that this deque is actively being pushed to */
104108
void notify_all() {
105-
std::unique_lock<std::mutex> lock{ this->_lock };
106-
this->_is_waitable = true;
109+
this->_is_waitable.store(true, std::memory_order_release);
107110
this->_cond.notify_all();
108111
}
109112

110113
/** Tell all listeners to stop */
111114
void kill_all() {
112-
std::unique_lock<std::mutex> lock{ this->_lock };
113-
this->_is_waitable = false;
115+
this->_is_waitable.store(false, std::memory_order_release);
114116
this->_cond.notify_all();
115117
}
116118

0 commit comments

Comments
 (0)