From cf64a66d18e757f0d53f0ff5df853a1e7877b966 Mon Sep 17 00:00:00 2001 From: Hermes Date: Mon, 13 Apr 2026 10:29:50 -0700 Subject: [PATCH 01/20] fix: eliminate data race on mNextSeq in pwrite path mNextSeq was a plain size_t array written by worker threads in inputPwrite() and read by setInputCompletedPwrite() with no synchronization -- a C++ data race (undefined behaviour). A stale read could produce a wrong lastSeq value, causing ftruncate() to silently truncate the output file at the wrong offset and drop the final gz member(s). Fix: change mNextSeq to std::atomic[]. - Worker threads write with memory_order_release after each pack, establishing a happens-before edge for the completion reader. - setInputCompletedPwrite() opens with an acquire fence before reading with memory_order_relaxed, ensuring all prior worker writes are visible before the ftruncate() call. --- src/writerthread.cpp | 19 +++++++++++++------ src/writerthread.h | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 5d21091a..0e5d0fc0 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -28,9 +28,9 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ if (mFd < 0) error_exit("Failed to open for pwrite: " + mFilename); mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE]; - mNextSeq = new size_t[mOptions->thread]; + mNextSeq = new std::atomic[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) - mNextSeq[t] = t; + mNextSeq[t].store(t, std::memory_order_relaxed); mCompressors = new libdeflate_compressor*[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); @@ -75,12 +75,15 @@ bool WriterThread::setInputCompleted() { } void WriterThread::setInputCompletedPwrite() { + // Acquire fence: synchronize with the release stores in inputPwrite() + // so that all mNextSeq[t] writes from worker threads are visible here. + std::atomic_thread_fence(std::memory_order_acquire); int W = mOptions->thread; size_t lastSeq = 0; bool anyProcessed = false; for (int t = 0; t < W; t++) { - if (mNextSeq[t] != (size_t)t) { - size_t workerLastSeq = mNextSeq[t] - W; + if (mNextSeq[t].load(std::memory_order_relaxed) != (size_t)t) { + size_t workerLastSeq = mNextSeq[t].load(std::memory_order_relaxed) - W; if (!anyProcessed || workerLastSeq > lastSeq) { lastSeq = workerLastSeq; anyProcessed = true; @@ -131,7 +134,7 @@ void WriterThread::inputPwrite(int tid, string* data) { const char* writeData = mCompBufs[tid]; size_t wsize = outsize; - size_t seq = mNextSeq[tid]; + size_t seq = mNextSeq[tid].load(std::memory_order_relaxed); // Wait for previous batch's cumulative offset. // Sleep yields CPU to prevent livelock under contention. @@ -164,7 +167,11 @@ void WriterThread::inputPwrite(int tid, string* data) { } } - mNextSeq[tid] += mOptions->thread; + // Release store: ensures the pwrite and cumulative_offset publication + // happen-before the acquire fence in setInputCompletedPwrite(). + mNextSeq[tid].store( + mNextSeq[tid].load(std::memory_order_relaxed) + mOptions->thread, + std::memory_order_release); } void WriterThread::cleanup() { diff --git a/src/writerthread.h b/src/writerthread.h index 053d27f6..bd685875 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -59,7 +59,7 @@ class WriterThread{ bool mPwriteMode; int mFd; OffsetSlot* mOffsetRing; - size_t* mNextSeq; + std::atomic* mNextSeq; libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers size_t* mCompBufSizes; // per-worker buffer sizes From b32ea3a10836fc6ab2e62cb1b00cf393de6332b9 Mon Sep 17 00:00:00 2001 From: Hermes Date: Mon, 13 Apr 2026 10:29:50 -0700 Subject: [PATCH 02/20] fix: make SPSC head pointer atomic to eliminate producer/consumer race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `head` pointer in SingleProducerSingleConsumerList was a plain (non-atomic) pointer despite being written by the producer thread (produce(), first-item branch) and read concurrently by the consumer thread (canBeConsumed(), consume()). ThreadSanitizer reported 15 data races at singleproducersingleconsumerlist.h:100. Fixes applied: - `head` declared as `std::atomic*>` (tail stays non-atomic — producer-private after first item is published) - Constructor: `head.store(NULL, relaxed)` - produce() first-item branch: set tail = item first (producer-private write), then `head.store(item, release)` to publish atomically to consumer then `item->nextItemReady.store(true, release)` to signal readiness - canBeConsumed(): `head.load(acquire)` for NULL check (syncs with produce release), `head.load(relaxed)` for nextItemReady dereference (covered by the preceding acquire) - consume(): `head.load(acquire)` to read current head, `head.store(h->nextItem, release)` to advance — establishes happens-before with next canBeConsumed() acquire on head Also fixes the else-branch nextItemReady assignment to use `memory_order_release` (was implicit seq_cst, which does NOT prevent compiler reordering of the preceding `tail->nextItem = item` write). --- src/singleproducersingleconsumerlist.h | 29 +++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index 66304dca..1b5b9498 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -63,7 +63,7 @@ template class SingleProducerSingleConsumerList { public: inline SingleProducerSingleConsumerList() { - head = NULL; + head.store(NULL, std::memory_order_relaxed); tail = NULL; producerFinished = false; consumerFinished = false; @@ -90,28 +90,33 @@ class SingleProducerSingleConsumerList { return produced - consumed; } inline bool canBeConsumed() { - if(head == NULL) + if(head.load(std::memory_order_acquire) == NULL) return false; - return head->nextItemReady || producerFinished; + return head.load(std::memory_order_relaxed)->nextItemReady || producerFinished; } inline void produce(T val) { LockFreeListItem* item = makeItem(val); - if(head==NULL) { - head = item; + if(head.load(std::memory_order_relaxed) == NULL) { tail = item; + // Release store: publishing head to consumer thread. + // All writes to *item are ordered before this store. + head.store(item, std::memory_order_release); // Signal the first item is consumable (no predecessor to set this) - head->nextItemReady.store(true, std::memory_order_release); + item->nextItemReady.store(true, std::memory_order_release); } else { tail->nextItem = item; - tail->nextItemReady = true; + // Release store: ensures nextItem write visible before nextItemReady flag. + tail->nextItemReady.store(true, std::memory_order_release); tail = item; } produced++; } inline T consume() { - assert(head != NULL); - T val = head->value; - head = head->nextItem; + LockFreeListItem* h = head.load(std::memory_order_acquire); + assert(h != NULL); + T val = h->value; + // Advance head; release so next canBeConsumed() acquire sees updated state. + head.store(h->nextItem, std::memory_order_release); consumed++; if((consumed & 0xFFF) == 0) recycle(); @@ -156,8 +161,8 @@ class SingleProducerSingleConsumerList { } private: - LockFreeListItem* head; - LockFreeListItem* tail; + std::atomic*> head; + LockFreeListItem* tail; // tail is producer-private, no atomic needed LockFreeListItem** blocks; std::atomic_bool producerFinished; std::atomic_bool consumerFinished; From 63163450d31267ea9716f29f3e5ec8b77e5b837a Mon Sep 17 00:00:00 2001 From: Hermes Date: Mon, 13 Apr 2026 19:41:28 -0700 Subject: [PATCH 03/20] fix: make ReadPool counters atomic and SPSC produced/consumed atomic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ThreadSanitizer reported data races in ReadPool and SPSC when multiple worker threads called ReadPool::input() concurrently: readpool.cpp:23 — mIsFull read vs. updateFullStatus() write readpool.cpp:27 — mProduced++ (non-atomic RMW) by multiple threads readpool.cpp:53 — mIsFull write vs. concurrent reads spsc.h:90 — size(): produced (producer-written) vs. consumed (consumer-written) read without synchronization Fixes in readpool.h: - mIsFull : bool → std::atomic - mProduced : size_t → std::atomic (atomic::operator++ and atomic::operator= are sufficient; no changes to readpool.cpp required) Fixes in singleproducersingleconsumerlist.h: - produced, consumed : unsigned long → std::atomic - size(): load both with memory_order_relaxed (approximate count used only as a soft back-pressure threshold) - produce(): produced.fetch_add(1, relaxed) - consume(): consumed.fetch_add(1, relaxed) with local snapshot for the (consumed & 0xFFF) recycle check - makeItem(): produced.load(relaxed) snapshot before >> and & ops - recycle(): consumed.load(relaxed) before >> op After all four commits (mNextSeq, SPSC head, ReadPool/SPSC atomics), ThreadSanitizer reports zero data races on 5k-read PE mode 8-thread workload. --- src/readpool.h | 5 +++-- src/singleproducersingleconsumerlist.h | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/readpool.h b/src/readpool.h index 2b1bf733..616637e2 100644 --- a/src/readpool.h +++ b/src/readpool.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "read.h" #include "options.h" #include "singleproducersingleconsumerlist.h" @@ -29,10 +30,10 @@ class ReadPool{ private: Options* mOptions; SingleProducerSingleConsumerList** mBufferLists; - size_t mProduced; + std::atomic mProduced; size_t mConsumed; unsigned long mLimit; - bool mIsFull; + std::atomic mIsFull; }; #endif \ No newline at end of file diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index 1b5b9498..cb79b6d8 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -87,7 +87,7 @@ class SingleProducerSingleConsumerList { blocks = NULL; } inline size_t size() { - return produced - consumed; + return produced.load(std::memory_order_relaxed) - consumed.load(std::memory_order_relaxed); } inline bool canBeConsumed() { if(head.load(std::memory_order_acquire) == NULL) @@ -109,7 +109,7 @@ class SingleProducerSingleConsumerList { tail->nextItemReady.store(true, std::memory_order_release); tail = item; } - produced++; + produced.fetch_add(1, std::memory_order_relaxed); } inline T consume() { LockFreeListItem* h = head.load(std::memory_order_acquire); @@ -117,8 +117,8 @@ class SingleProducerSingleConsumerList { T val = h->value; // Advance head; release so next canBeConsumed() acquire sees updated state. head.store(h->nextItem, std::memory_order_release); - consumed++; - if((consumed & 0xFFF) == 0) + unsigned long _c = consumed.fetch_add(1, std::memory_order_relaxed) + 1; + if((_c & 0xFFF) == 0) recycle(); return val; } @@ -137,8 +137,9 @@ class SingleProducerSingleConsumerList { private: // blockized list inline LockFreeListItem* makeItem(T val) { - unsigned long blk = produced >> 12; - unsigned long idx = produced & 0xFFF; + unsigned long _p = produced.load(std::memory_order_relaxed); + unsigned long blk = _p >> 12; + unsigned long idx = _p & 0xFFF; size_t size = 0x01<<12; if(blocksNum <= blk) { LockFreeListItem* buffer = new LockFreeListItem[size]; @@ -152,7 +153,7 @@ class SingleProducerSingleConsumerList { } inline void recycle() { - unsigned long blk = consumed >> 12; + unsigned long blk = consumed.load(std::memory_order_relaxed) >> 12; while((recycled+1) < blk) { delete[] blocks[recycled & blocksRingBufferSizeMask]; blocks[recycled & blocksRingBufferSizeMask] = NULL; @@ -166,8 +167,8 @@ class SingleProducerSingleConsumerList { LockFreeListItem** blocks; std::atomic_bool producerFinished; std::atomic_bool consumerFinished; - unsigned long produced; - unsigned long consumed; + std::atomic produced; + std::atomic consumed; unsigned long recycled; unsigned long blocksRingBufferSize; unsigned long blocksRingBufferSizeMask; From fe5d994835f6e271da461750002e5482f788aeb3 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 21:59:17 -0700 Subject: [PATCH 04/20] build: switch to C++23 with GCC 15 (conda-forge toolchain) - Use /home/kimy/build-env g++ (GCC 15.2.0, conda-forge) - Upgrade -std=c++11 -> -std=c++23 - Default INCLUDE_DIRS and LIBRARY_DIRS to build-env paths Enables: std::jthread, std::latch, std::counting_semaphore, atomic::wait/notify, std::println --- Makefile | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 53af9110..4b49f83e 100644 --- a/Makefile +++ b/Makefile @@ -23,8 +23,10 @@ TARGET := fastp BIN_TARGET := ${TARGET} -CXX ?= g++ -CXXFLAGS := -std=c++11 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} +CXX ?= /home/kimy/build-env/bin/x86_64-conda-linux-gnu-g++ +INCLUDE_DIRS ?= /home/kimy/build-env/include +LIBRARY_DIRS ?= /home/kimy/build-env/lib +CXXFLAGS := -std=c++23 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} LIBS := -lisal -ldeflate -lhwy -lpthread PKG_LDFLAGS := $(HWY_LIBS) $(ISAL_LIBS) $(DEFLATE_LIBS) From b1ea8669390eaeaf98009a8cc742d297b0638d95 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 22:44:43 -0700 Subject: [PATCH 05/20] refactor(5): replace cerr with std::println in PE/SE reporting blocks --- Makefile | 2 +- src/peprocessor.cpp | 40 +++++++++++++++++++++------------------- src/seprocessor.cpp | 17 +++++++++-------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/Makefile b/Makefile index 4b49f83e..40fc1a53 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ TARGET := fastp BIN_TARGET := ${TARGET} -CXX ?= /home/kimy/build-env/bin/x86_64-conda-linux-gnu-g++ +CXX := /home/kimy/build-env/bin/x86_64-conda-linux-gnu-g++ INCLUDE_DIRS ?= /home/kimy/build-env/include LIBRARY_DIRS ?= /home/kimy/build-env/lib CXXFLAGS := -std=c++23 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 934102c7..15feee63 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -1,6 +1,7 @@ #include "peprocessor.h" #include "fastqreader.h" #include +#include #include #include #include @@ -233,52 +234,53 @@ bool PairEndProcessor::process(){ Stats* finalPostStats2 = Stats::merge(postStats2); FilterResult* finalFilterResult = FilterResult::merge(filterResults); - cerr << "Read1 before filtering:"<print(); - cerr << endl; - cerr << "Read2 before filtering:"<print(); - cerr << endl; + std::println(stderr, ""); if(!mOptions->merge.enabled) { - cerr << "Read1 after filtering:"<print(); - cerr << endl; - cerr << "Read2 after filtering:"<print(); } else { - cerr << "Merged and filtered:"<print(); } - cerr << endl; - cerr << "Filtering result:"<print(); double dupRate = 0.0; if(mOptions->duplicate.enabled) { dupRate = mDuplicate->getDupRate(); - cerr << endl; - cerr << "Duplication rate: " << dupRate * 100.0 << "%" << endl; + std::println(stderr, ""); + std::println(stderr, "Duplication rate: {:.4}%", dupRate * 100.0); } // insert size distribution int peakInsertSize = getPeakInsertSize(); - cerr << endl; - cerr << "Insert size peak (evaluated by paired-end reads): " << peakInsertSize << endl; + std::println(stderr, ""); + std::println(stderr, "Insert size peak (evaluated by paired-end reads): {}", peakInsertSize); if(mOptions->merge.enabled) { - cerr << endl; - cerr << "Read pairs merged: " << finalFilterResult->mMergedPairs << endl; + std::println(stderr, ""); + std::println(stderr, "Read pairs merged: {}", finalFilterResult->mMergedPairs); if(finalPostStats1->getReads() > 0) { double postMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPostStats1->getReads(); double preMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPreStats1->getReads(); - cerr << "% of original read pairs: " << preMergedPercent << "%" << endl; - cerr << "% in reads after filtering: " << postMergedPercent << "%" << endl; + std::println(stderr, "% of original read pairs: {:.4}%", preMergedPercent); + std::println(stderr, "% in reads after filtering: {:.4}%", postMergedPercent); } - cerr << endl; + std::println(stderr, ""); } // make JSON report + JsonReporter jr(mOptions); jr.setDup(dupRate); jr.setInsertHist(mInsertSizeHist, peakInsertSize); diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 577cab2f..3a56885c 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,4 +1,5 @@ #include "seprocessor.h" +#include #include "fastqreader.h" #include #include @@ -135,24 +136,24 @@ bool SingleEndProcessor::process(){ postStats.push_back(configs[t]->getPostStats1()); } - cerr << "Read1 before filtering:"<print(); - cerr << endl; - cerr << "Read1 after filtering:"<print(); - cerr << endl; - cerr << "Filtering result:"<print(); double dupRate = 0.0; if(mOptions->duplicate.enabled) { dupRate = mDuplicate->getDupRate(); - cerr << endl; - cerr << "Duplication rate (may be overestimated since this is SE data): " << dupRate * 100.0 << "%" << endl; + std::println(stderr, ""); + std::println(stderr, "Duplication rate (may be overestimated since this is SE data): {:.4}%", dupRate * 100.0); } - // make JSON report + // make JSON report JsonReporter jr(mOptions); jr.setDup(dupRate); jr.report(finalFilterResult, finalPreStats, finalPostStats); From f390419f249a3d4d5cde3749e87d6e00fefc9266 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 22:49:38 -0700 Subject: [PATCH 06/20] refactor(4): replace mFinishedThreads atomic counter with std::latch --- src/peprocessor.cpp | 41 +++++++++++++++++++++-------------------- src/peprocessor.h | 3 ++- src/seprocessor.cpp | 19 +++++++++++-------- src/seprocessor.h | 3 ++- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 15feee63..ec5aa72d 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -14,11 +14,11 @@ #include "htmlreporter.h" #include "polyx.h" -PairEndProcessor::PairEndProcessor(Options* opt){ +PairEndProcessor::PairEndProcessor(Options* opt) + : mWorkersLatch(opt->thread){ mOptions = opt; mLeftReaderFinished = false; mRightReaderFinished = false; - mFinishedThreads = 0; mFilter = new Filter(opt); mUmiProcessor = new UmiProcessor(opt); @@ -191,6 +191,24 @@ bool PairEndProcessor::process(){ readerLeft->join(); readerRight->join(); } + + // Wait for all worker threads to finish, then signal writers to flush and exit + mWorkersLatch.wait(); + if(mLeftWriter) + mLeftWriter->setInputCompleted(); + if(mRightWriter) + mRightWriter->setInputCompleted(); + if(mUnpairedLeftWriter) + mUnpairedLeftWriter->setInputCompleted(); + if(mUnpairedRightWriter) + mUnpairedRightWriter->setInputCompleted(); + if(mMergedWriter) + mMergedWriter->setInputCompleted(); + if(mFailedWriter) + mFailedWriter->setInputCompleted(); + if(mOverlappedWriter) + mOverlappedWriter->setInputCompleted(); + for(int t=0; tthread; t++){ threads[t]->join(); } @@ -1034,28 +1052,11 @@ void PairEndProcessor::processorTask(ThreadConfig* config) inputLeft->setConsumerFinished(); inputRight->setConsumerFinished(); - int finishedCount = mFinishedThreads.fetch_add(1, std::memory_order_release) + 1; + mWorkersLatch.count_down(); if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed"; loginfo(msg); } - - if(finishedCount == mOptions->thread) { - if(mLeftWriter) - mLeftWriter->setInputCompleted(); - if(mRightWriter) - mRightWriter->setInputCompleted(); - if(mUnpairedLeftWriter) - mUnpairedLeftWriter->setInputCompleted(); - if(mUnpairedRightWriter) - mUnpairedRightWriter->setInputCompleted(); - if(mMergedWriter) - mMergedWriter->setInputCompleted(); - if(mFailedWriter) - mFailedWriter->setInputCompleted(); - if(mOverlappedWriter) - mOverlappedWriter->setInputCompleted(); - } if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " finished"; diff --git a/src/peprocessor.h b/src/peprocessor.h index 707e41cb..fd5604db 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -17,6 +17,7 @@ #include "writerthread.h" #include "duplicate.h" #include "readpool.h" +#include using namespace std; @@ -46,7 +47,7 @@ class PairEndProcessor{ private: atomic_bool mLeftReaderFinished; atomic_bool mRightReaderFinished; - alignas(128) atomic_int mFinishedThreads; + std::latch mWorkersLatch; Options* mOptions; Filter* mFilter; UmiProcessor* mUmiProcessor; diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 3a56885c..07c2b1b1 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -13,10 +13,10 @@ #include "adaptertrimmer.h" #include "polyx.h" -SingleEndProcessor::SingleEndProcessor(Options* opt){ +SingleEndProcessor::SingleEndProcessor(Options* opt) + : mWorkersLatch(opt->thread){ mOptions = opt; mReaderFinished = false; - mFinishedThreads = 0; mFilter = new Filter(opt); mUmiProcessor = new UmiProcessor(opt); mLeftWriter = NULL; @@ -103,6 +103,14 @@ bool SingleEndProcessor::process(){ failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); readerThread.join(); + + // Wait for all worker threads to finish, then signal writers to flush and exit + mWorkersLatch.wait(); + if(mLeftWriter) + mLeftWriter->setInputCompleted(); + if(mFailedWriter) + mFailedWriter->setInputCompleted(); + for(int t=0; tthread; t++){ threads[t]->join(); } @@ -456,12 +464,7 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) } input->setConsumerFinished(); - if(mFinishedThreads.fetch_add(1, std::memory_order_release) + 1 == mOptions->thread) { - if(mLeftWriter) - mLeftWriter->setInputCompleted(); - if(mFailedWriter) - mFailedWriter->setInputCompleted(); - } + mWorkersLatch.count_down(); if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " finished"; diff --git a/src/seprocessor.h b/src/seprocessor.h index b3c71521..908d3dd7 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -17,6 +17,7 @@ #include "duplicate.h" #include "singleproducersingleconsumerlist.h" #include "readpool.h" +#include using namespace std; @@ -41,7 +42,7 @@ class SingleEndProcessor{ private: Options* mOptions; atomic_bool mReaderFinished; - alignas(128) atomic_int mFinishedThreads; + std::latch mWorkersLatch; Filter* mFilter; UmiProcessor* mUmiProcessor; WriterThread* mLeftWriter; From 275bd43dc76f6020ea85202c78df349797f2c573 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 22:52:35 -0700 Subject: [PATCH 07/20] refactor(3): replace raw thread* with std::jthread + std::optional, remove manual join/delete --- src/peprocessor.cpp | 25 ------------------------- src/seprocessor.cpp | 41 ++++++++++++++++------------------------- 2 files changed, 16 insertions(+), 50 deletions(-) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index ec5aa72d..59f01476 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -312,43 +312,18 @@ bool PairEndProcessor::process(){ // clean up for(int t=0; tthread; t++){ - delete threads[t]; - threads[t] = NULL; delete configs[t]; configs[t] = NULL; } - if(readerInterveleaved) { - delete readerInterveleaved; - } else { - delete readerLeft; - delete readerRight; - } - delete finalPreStats1; delete finalPostStats1; delete finalPreStats2; delete finalPostStats2; delete finalFilterResult; - delete[] threads; delete[] configs; - if(leftWriterThread) - delete leftWriterThread; - if(rightWriterThread) - delete rightWriterThread; - if(unpairedLeftWriterThread) - delete unpairedLeftWriterThread; - if(unpairedRightWriterThread) - delete unpairedRightWriterThread; - if(mergedWriterThread) - delete mergedWriterThread; - if(failedWriterThread) - delete failedWriterThread; - if(overlappedWriterThread) - delete overlappedWriterThread; - if(!mOptions->split.enabled) closeOutput(); diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 07c2b1b1..127099aa 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -88,38 +88,37 @@ bool SingleEndProcessor::process(){ initConfig(configs[t]); } - std::thread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); + // Reader thread (jthread auto-joins on destruction) + std::jthread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); - std::thread** threads = new thread*[mOptions->thread]; - for(int t=0; tthread; t++){ - threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); - } + // Worker threads + std::vector workers; + workers.reserve(mOptions->thread); + for(int t=0; tthread; t++) + workers.emplace_back(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); - std::thread* leftWriterThread = NULL; - std::thread* failedWriterThread = NULL; + // Writer threads (conditional) + std::optional leftWriterThread; + std::optional failedWriterThread; if(mLeftWriter) - leftWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mLeftWriter)); + leftWriterThread.emplace(std::bind(&SingleEndProcessor::writerTask, this, mLeftWriter)); if(mFailedWriter) - failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); + failedWriterThread.emplace(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); + // Wait for reader to finish, then workers, then signal writers readerThread.join(); - // Wait for all worker threads to finish, then signal writers to flush and exit mWorkersLatch.wait(); if(mLeftWriter) mLeftWriter->setInputCompleted(); if(mFailedWriter) mFailedWriter->setInputCompleted(); - for(int t=0; tthread; t++){ - threads[t]->join(); - } + workers.clear(); if(!mOptions->split.enabled) { - if(leftWriterThread) - leftWriterThread->join(); - if(failedWriterThread) - failedWriterThread->join(); + leftWriterThread.reset(); + failedWriterThread.reset(); } if(mOptions->verbose) @@ -173,8 +172,6 @@ bool SingleEndProcessor::process(){ // clean up for(int t=0; tthread; t++){ - delete threads[t]; - threads[t] = NULL; delete configs[t]; configs[t] = NULL; } @@ -183,14 +180,8 @@ bool SingleEndProcessor::process(){ delete finalPostStats; delete finalFilterResult; - delete[] threads; delete[] configs; - if(leftWriterThread) - delete leftWriterThread; - if(failedWriterThread) - delete failedWriterThread; - if(!mOptions->split.enabled) closeOutput(); From caf7ed27f4748e9c57fd2a7a8fe2eb64538d70c6 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 22:54:17 -0700 Subject: [PATCH 08/20] refactor(2): replace usleep(100) busy-wait with std::counting_semaphore in writer output loop --- src/writerthread.cpp | 10 ++++++---- src/writerthread.h | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 0e5d0fc0..6d16aa95 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -8,7 +8,8 @@ #include #include -WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ +WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT) + : mOutputSem(0){ mOptions = opt; mWriter1 = NULL; mInputCompleted = false; @@ -71,6 +72,7 @@ bool WriterThread::setInputCompleted() { for(int t=0; tthread; t++) { mBufferLists[t]->setProducerFinished(); } + mOutputSem.release(); // Wake writer loop to detect completion return true; } @@ -97,10 +99,9 @@ void WriterThread::setInputCompletedPwrite() { void WriterThread::output(){ if (mPwriteMode) return; // no-op + mOutputSem.acquire(); // block until data available or completion signal SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; - if(!list->canBeConsumed()) { - usleep(100); - } else { + if(list->canBeConsumed()) { string* str = list->consume(); mWriter1->write(str->data(), str->length()); delete str; @@ -116,6 +117,7 @@ void WriterThread::input(int tid, string* data) { } mBufferLists[tid]->produce(data); mBufferLength++; + mOutputSem.release(); } void WriterThread::inputPwrite(int tid, string* data) { diff --git a/src/writerthread.h b/src/writerthread.h index bd685875..ce942d83 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -9,6 +9,7 @@ #include "options.h" #include #include +#include #include #include "singleproducersingleconsumerlist.h" @@ -52,6 +53,7 @@ class WriterThread{ bool mInputCompleted; atomic_long mBufferLength; + std::counting_semaphore<> mOutputSem; SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; From 40329bb35dcc7da3170ddda08f6d20438811f461 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Wed, 15 Apr 2026 23:00:21 -0700 Subject: [PATCH 09/20] refactor(1): replace mutex+CV backpressure polling with atomic::wait/notify_all --- src/peprocessor.cpp | 54 ++++++++++++++++++--------------------------- src/peprocessor.h | 2 -- src/seprocessor.cpp | 22 +++++++----------- src/seprocessor.h | 2 -- 4 files changed, 29 insertions(+), 51 deletions(-) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 59f01476..894fa203 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -686,7 +686,7 @@ bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, T delete rightPack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mBackpressureCV.notify_all(); + mPackProcessedCounter.notify_all(); return true; } @@ -758,7 +758,6 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } - mBackpressureCV.notify_all(); data = NULL; if(read) { delete read; @@ -795,34 +794,31 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } - mBackpressureCV.notify_all(); //re-initialize data for next pack data = new Read*[PACK_SIZE]; memset(data, 0, sizeof(Read*)*PACK_SIZE); - // if the processor is far behind this reader, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - if(isLeft) { - while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } - } else { - while(mRightPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the processor is far behind this reader, wait to limit memory usage + if(isLeft) { + while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + long cur = mPackProcessedCounter.load(std::memory_order_acquire); + mPackProcessedCounter.wait(cur, std::memory_order_acquire); + slept++; + } + } else { + while(mRightPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + long cur = mPackProcessedCounter.load(std::memory_order_acquire); + mPackProcessedCounter.wait(cur, std::memory_order_acquire); + slept++; } } readNum += count; // if the writer threads are far behind this producer, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ + std::this_thread::yield(); slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); } } // reset count to 0 @@ -850,7 +846,6 @@ void PairEndProcessor::readerTask(bool isLeft) else mRightInputLists[t]->setProducerFinished(); } - mBackpressureCV.notify_all(); if(mOptions->verbose) { if(isLeft) { @@ -905,7 +900,6 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; - mBackpressureCV.notify_all(); dataLeft = NULL; dataRight = NULL; break; @@ -936,29 +930,25 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; - mBackpressureCV.notify_all(); //re-initialize data for next pack dataLeft = new Read*[PACK_SIZE]; dataRight = new Read*[PACK_SIZE]; memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE); memset(dataRight, 0, sizeof(Read*)*PACK_SIZE); - // if the consumer is far behind this producer, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the consumer is far behind this producer, wait to limit memory usage + while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + long cur = mPackProcessedCounter.load(std::memory_order_acquire); + mPackProcessedCounter.wait(cur, std::memory_order_acquire); + slept++; } readNum += count; // if the writer threads are far behind this producer, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ + std::this_thread::yield(); slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); } } // reset count to 0 @@ -986,7 +976,6 @@ void PairEndProcessor::interleavedReaderTask() mLeftInputLists[t]->setProducerFinished(); mRightInputLists[t]->setProducerFinished(); } - mBackpressureCV.notify_all(); if(mOptions->verbose) { loginfo("interleaved: loading completed with " + to_string(mLeftPackReadCounter) + " packs"); @@ -1020,8 +1009,7 @@ void PairEndProcessor::processorTask(ThreadConfig* config) } else if(inputRight->isProducerFinished() && !inputRight->canBeConsumed()) { break; } else { - std::unique_lock lk(mBackpressureMtx); - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + std::this_thread::yield(); } } inputLeft->setConsumerFinished(); diff --git a/src/peprocessor.h b/src/peprocessor.h index fd5604db..4c8195fe 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -68,8 +68,6 @@ class PairEndProcessor{ ReadPool* mLeftReadPool; ReadPool* mRightReadPool; atomic_bool shouldStopReading; - std::mutex mBackpressureMtx; - std::condition_variable mBackpressureCV; }; diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 127099aa..8722f4ed 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -317,7 +317,7 @@ bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ delete pack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mBackpressureCV.notify_all(); + mPackProcessedCounter.notify_all(); return true; } @@ -347,7 +347,6 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; - mBackpressureCV.notify_all(); data = NULL; if(read) { delete read; @@ -373,26 +372,22 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; - mBackpressureCV.notify_all(); //re-initialize data for next pack data = new Read*[PACK_SIZE]; memset(data, 0, sizeof(Read*)*PACK_SIZE); - // if the processor is far behind this reader, sleep and wait to limit memory usage - { - std::unique_lock lk(mBackpressureMtx); - while( mPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); - } + // if the processor is far behind this reader, wait to limit memory usage + while(mPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ + long cur = mPackProcessedCounter.load(std::memory_order_acquire); + mPackProcessedCounter.wait(cur, std::memory_order_acquire); + slept++; } readNum += count; // if the writer threads are far behind this reader, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - std::unique_lock lk(mBackpressureMtx); while(mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + std::this_thread::yield(); slept++; - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); } } // reset count to 0 @@ -449,8 +444,7 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) break; } } else { - std::unique_lock lk(mBackpressureMtx); - mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + std::this_thread::yield(); } } input->setConsumerFinished(); diff --git a/src/seprocessor.h b/src/seprocessor.h index 908d3dd7..760daf4d 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -52,8 +52,6 @@ class SingleEndProcessor{ size_t mPackReadCounter; alignas(128) atomic_long mPackProcessedCounter; ReadPool* mReadPool; - std::mutex mBackpressureMtx; - std::condition_variable mBackpressureCV; }; From 536e91151a042999876cd252f8cc60ab11fd1a5e Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 01:27:33 -0700 Subject: [PATCH 10/20] =?UTF-8?q?fix(2):=20revert=20counting=5Fsemaphore,?= =?UTF-8?q?=20use=20yield()=20=E2=80=94=20semaphore=20release/acquire=20mi?= =?UTF-8?q?smatch=20with=20N-producer=20round-robin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/writerthread.cpp | 10 ++++------ src/writerthread.h | 2 -- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 6d16aa95..3651c187 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -8,8 +8,7 @@ #include #include -WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT) - : mOutputSem(0){ +WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mOptions = opt; mWriter1 = NULL; mInputCompleted = false; @@ -72,7 +71,6 @@ bool WriterThread::setInputCompleted() { for(int t=0; tthread; t++) { mBufferLists[t]->setProducerFinished(); } - mOutputSem.release(); // Wake writer loop to detect completion return true; } @@ -99,9 +97,10 @@ void WriterThread::setInputCompletedPwrite() { void WriterThread::output(){ if (mPwriteMode) return; // no-op - mOutputSem.acquire(); // block until data available or completion signal SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; - if(list->canBeConsumed()) { + if(!list->canBeConsumed()) { + std::this_thread::yield(); + } else { string* str = list->consume(); mWriter1->write(str->data(), str->length()); delete str; @@ -117,7 +116,6 @@ void WriterThread::input(int tid, string* data) { } mBufferLists[tid]->produce(data); mBufferLength++; - mOutputSem.release(); } void WriterThread::inputPwrite(int tid, string* data) { diff --git a/src/writerthread.h b/src/writerthread.h index ce942d83..bd685875 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -9,7 +9,6 @@ #include "options.h" #include #include -#include #include #include "singleproducersingleconsumerlist.h" @@ -53,7 +52,6 @@ class WriterThread{ bool mInputCompleted; atomic_long mBufferLength; - std::counting_semaphore<> mOutputSem; SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; From 548135670bf55227df67e6e92a3818366a1aa7cb Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 19:38:17 -0700 Subject: [PATCH 11/20] perf: replace yield() with hwy::BlockUntilDifferent/WakeAll futex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all std::this_thread::yield() busy-spin loops with hwy::BlockUntilDifferent/WakeAll from Highway's futex.h polyfill. This provides cross-platform kernel-level thread blocking (Linux futex, macOS __ulock, FreeBSD NanoSleep fallback) instead of CPU-burning spins. Changes: - writerthread: output() waits on mBufferLength via BlockUntilDifferent, input() wakes writer via WakeAll after produce - writerthread.h: add waitForBufferBelow() using BlockUntilDifferent loop - peprocessor: replace 6 yield() sites with atomic wait/notify on mPackProducedCounter and mPackProcessedCounter - seprocessor: same pattern as peprocessor for SE pipeline - Change counter types from atomic_long to atomic for Highway futex compatibility (uint32_t required by BlockUntilDifferent) Benchmark (5M PE reads, gz→gz, -w 3): master: 56.5s wall, 8.0s sys, 2680K page-faults yield (before): 79.4s wall, 26.8s sys, 3278K page-faults futex (after): 47.8s wall, 1.4s sys, 120K page-faults wall -15%, sys -82%, page-faults -95% vs master Output md5 matches master (correctness verified) --- Makefile | 6 +++--- src/peprocessor.cpp | 41 +++++++++++++++++++++++++---------------- src/peprocessor.h | 3 ++- src/seprocessor.cpp | 20 ++++++++++++-------- src/seprocessor.h | 3 ++- src/writerthread.cpp | 9 ++++++--- src/writerthread.h | 12 ++++++++++-- 7 files changed, 60 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index 40fc1a53..7041f22d 100644 --- a/Makefile +++ b/Makefile @@ -23,9 +23,9 @@ TARGET := fastp BIN_TARGET := ${TARGET} -CXX := /home/kimy/build-env/bin/x86_64-conda-linux-gnu-g++ -INCLUDE_DIRS ?= /home/kimy/build-env/include -LIBRARY_DIRS ?= /home/kimy/build-env/lib +CXX := /var/tmp/kimy/workspace/build-env/bin/x86_64-conda-linux-gnu-g++ +INCLUDE_DIRS ?= /var/tmp/kimy/workspace/build-env/include +LIBRARY_DIRS ?= /var/tmp/kimy/workspace/build-env/lib CXXFLAGS := -std=c++23 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} LIBS := -lisal -ldeflate -lhwy -lpthread diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 894fa203..8280b597 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -1,4 +1,5 @@ #include "peprocessor.h" +#include "hwy/contrib/thread_pool/futex.h" #include "fastqreader.h" #include #include @@ -42,6 +43,7 @@ PairEndProcessor::PairEndProcessor(Options* opt) mLeftPackReadCounter = 0; mRightPackReadCounter = 0; mPackProcessedCounter = 0; + mPackProducedCounter = 0; mLeftReadPool = new ReadPool(mOptions); mRightReadPool = new ReadPool(mOptions); @@ -686,7 +688,7 @@ bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, T delete rightPack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mPackProcessedCounter.notify_all(); + hwy::WakeAll(mPackProcessedCounter); return true; } @@ -758,6 +760,8 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); data = NULL; if(read) { delete read; @@ -794,6 +798,8 @@ void PairEndProcessor::readerTask(bool isLeft) mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack); mRightPackReadCounter++; } + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //re-initialize data for next pack data = new Read*[PACK_SIZE]; @@ -801,14 +807,14 @@ void PairEndProcessor::readerTask(bool isLeft) // if the processor is far behind this reader, wait to limit memory usage if(isLeft) { while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - long cur = mPackProcessedCounter.load(std::memory_order_acquire); - mPackProcessedCounter.wait(cur, std::memory_order_acquire); + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); slept++; } } else { while(mRightPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - long cur = mPackProcessedCounter.load(std::memory_order_acquire); - mPackProcessedCounter.wait(cur, std::memory_order_acquire); + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); slept++; } } @@ -816,10 +822,8 @@ void PairEndProcessor::readerTask(bool isLeft) // if the writer threads are far behind this producer, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ - std::this_thread::yield(); - slept++; - } + if(mLeftWriter) mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); + if(mRightWriter) mRightWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); } // reset count to 0 count = 0; @@ -900,6 +904,9 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); + dataLeft = NULL; dataRight = NULL; break; @@ -931,6 +938,9 @@ void PairEndProcessor::interleavedReaderTask() mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight); mRightPackReadCounter++; + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); + //re-initialize data for next pack dataLeft = new Read*[PACK_SIZE]; dataRight = new Read*[PACK_SIZE]; @@ -938,18 +948,16 @@ void PairEndProcessor::interleavedReaderTask() memset(dataRight, 0, sizeof(Read*)*PACK_SIZE); // if the consumer is far behind this producer, wait to limit memory usage while(mLeftPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - long cur = mPackProcessedCounter.load(std::memory_order_acquire); - mPackProcessedCounter.wait(cur, std::memory_order_acquire); + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); slept++; } readNum += count; // if the writer threads are far behind this producer, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){ - std::this_thread::yield(); - slept++; - } + if(mLeftWriter) mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); + if(mRightWriter) mRightWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); } // reset count to 0 count = 0; @@ -1009,7 +1017,8 @@ void PairEndProcessor::processorTask(ThreadConfig* config) } else if(inputRight->isProducerFinished() && !inputRight->canBeConsumed()) { break; } else { - std::this_thread::yield(); + uint32_t cur = mPackProducedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProducedCounter); } } inputLeft->setConsumerFinished(); diff --git a/src/peprocessor.h b/src/peprocessor.h index 4c8195fe..e957edea 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -64,7 +64,8 @@ class PairEndProcessor{ SingleProducerSingleConsumerList** mRightInputLists; size_t mLeftPackReadCounter; size_t mRightPackReadCounter; - alignas(128) atomic_long mPackProcessedCounter; + alignas(128) std::atomic mPackProcessedCounter; + alignas(128) std::atomic mPackProducedCounter; ReadPool* mLeftReadPool; ReadPool* mRightReadPool; atomic_bool shouldStopReading; diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 8722f4ed..8463a35f 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,4 +1,5 @@ #include "seprocessor.h" +#include "hwy/contrib/thread_pool/futex.h" #include #include "fastqreader.h" #include @@ -29,6 +30,7 @@ SingleEndProcessor::SingleEndProcessor(Options* opt) mPackReadCounter = 0; mPackProcessedCounter = 0; + mPackProducedCounter = 0; mReadPool = new ReadPool(mOptions); } @@ -317,7 +319,7 @@ bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ delete pack; mPackProcessedCounter.fetch_add(1, std::memory_order_release); - mPackProcessedCounter.notify_all(); + hwy::WakeAll(mPackProcessedCounter); return true; } @@ -347,6 +349,8 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); data = NULL; if(read) { delete read; @@ -372,23 +376,22 @@ void SingleEndProcessor::readerTask() pack->count = count; mInputLists[mPackReadCounter % mOptions->thread]->produce(pack); mPackReadCounter++; + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //re-initialize data for next pack data = new Read*[PACK_SIZE]; memset(data, 0, sizeof(Read*)*PACK_SIZE); // if the processor is far behind this reader, wait to limit memory usage while(mPackReadCounter - mPackProcessedCounter.load(std::memory_order_acquire) > PACK_IN_MEM_LIMIT){ - long cur = mPackProcessedCounter.load(std::memory_order_acquire); - mPackProcessedCounter.wait(cur, std::memory_order_acquire); + uint32_t cur = mPackProcessedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProcessedCounter); slept++; } readNum += count; // if the writer threads are far behind this reader, sleep and wait // check this only when necessary if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - while(mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { - std::this_thread::yield(); - slept++; - } + mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); } // reset count to 0 count = 0; @@ -444,7 +447,8 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) break; } } else { - std::this_thread::yield(); + uint32_t cur = mPackProducedCounter.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mPackProducedCounter); } } input->setConsumerFinished(); diff --git a/src/seprocessor.h b/src/seprocessor.h index 760daf4d..962db7ca 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -50,7 +50,8 @@ class SingleEndProcessor{ Duplicate* mDuplicate; SingleProducerSingleConsumerList** mInputLists; size_t mPackReadCounter; - alignas(128) atomic_long mPackProcessedCounter; + alignas(128) std::atomic mPackProcessedCounter; + alignas(128) std::atomic mPackProducedCounter; ReadPool* mReadPool; }; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 3651c187..5749411b 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -99,12 +99,14 @@ void WriterThread::output(){ if (mPwriteMode) return; // no-op SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { - std::this_thread::yield(); + uint32_t cur = mBufferLength.load(std::memory_order_acquire); + if(cur == 0) hwy::BlockUntilDifferent(cur, mBufferLength); } else { string* str = list->consume(); mWriter1->write(str->data(), str->length()); delete str; - mBufferLength--; + mBufferLength.fetch_sub(1, std::memory_order_release); + hwy::WakeAll(mBufferLength); mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; } } @@ -115,7 +117,8 @@ void WriterThread::input(int tid, string* data) { return; } mBufferLists[tid]->produce(data); - mBufferLength++; + mBufferLength.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mBufferLength); } void WriterThread::inputPwrite(int tid, string* data) { diff --git a/src/writerthread.h b/src/writerthread.h index bd685875..f57313aa 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -8,6 +8,7 @@ #include "writer.h" #include "options.h" #include +#include "hwy/contrib/thread_pool/futex.h" #include #include #include "singleproducersingleconsumerlist.h" @@ -36,7 +37,14 @@ class WriterThread{ void input(int tid, string* data); bool setInputCompleted(); - long bufferLength() {return mBufferLength;}; + uint32_t bufferLength() {return mBufferLength;}; + void waitForBufferBelow(uint32_t limit) { + for(;;) { + uint32_t cur = mBufferLength.load(std::memory_order_acquire); + if(cur <= limit) break; + hwy::BlockUntilDifferent(cur, mBufferLength); + } + } string getFilename() {return mFilename;} bool isPwriteMode() {return mPwriteMode;} @@ -51,7 +59,7 @@ class WriterThread{ string mFilename; bool mInputCompleted; - atomic_long mBufferLength; + std::atomic mBufferLength; SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; From 2f33c857c2cf08edf8917246c74981c99076f219 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 19:49:48 -0700 Subject: [PATCH 12/20] fix: restore CXX ?= g++ in Makefile (remove hardcoded local toolchain path) --- Makefile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 7041f22d..d736f1e6 100644 --- a/Makefile +++ b/Makefile @@ -23,9 +23,7 @@ TARGET := fastp BIN_TARGET := ${TARGET} -CXX := /var/tmp/kimy/workspace/build-env/bin/x86_64-conda-linux-gnu-g++ -INCLUDE_DIRS ?= /var/tmp/kimy/workspace/build-env/include -LIBRARY_DIRS ?= /var/tmp/kimy/workspace/build-env/lib +CXX ?= g++ CXXFLAGS := -std=c++23 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} LIBS := -lisal -ldeflate -lhwy -lpthread From 04452e5d48d8cff9265a99e00debfa803d7d5487 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 20:29:02 -0700 Subject: [PATCH 13/20] refactor: downgrade C++20/23 to C++11 for Apple Clang compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - std::jthread → std::thread + explicit join() - std::latch → atomic + hwy futex wait/wake - std::println → cerr << - Remove #include , ; use , - Makefile: -std=c++23 → -std=c++11 Preserves Highway futex performance (sys ~1.7s, page-faults ~150K). Apple Clang on macOS CI does not support jthread/latch/println. --- Makefile | 2 +- src/peprocessor.cpp | 53 ++++++++++++++++++----------------- src/peprocessor.h | 4 +-- src/seprocessor.cpp | 68 ++++++++++++++++++++++++++------------------- src/seprocessor.h | 4 +-- 5 files changed, 73 insertions(+), 58 deletions(-) diff --git a/Makefile b/Makefile index d736f1e6..53af9110 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ TARGET := fastp BIN_TARGET := ${TARGET} CXX ?= g++ -CXXFLAGS := -std=c++23 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} +CXXFLAGS := -std=c++11 -pthread -g -O3 -MD -MP -I. -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) $(HWY_CFLAGS) $(ISAL_CFLAGS) $(DEFLATE_CFLAGS) ${CXXFLAGS} LIBS := -lisal -ldeflate -lhwy -lpthread PKG_LDFLAGS := $(HWY_LIBS) $(ISAL_LIBS) $(DEFLATE_LIBS) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 8280b597..4362df84 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -2,7 +2,6 @@ #include "hwy/contrib/thread_pool/futex.h" #include "fastqreader.h" #include -#include #include #include #include @@ -15,8 +14,8 @@ #include "htmlreporter.h" #include "polyx.h" -PairEndProcessor::PairEndProcessor(Options* opt) - : mWorkersLatch(opt->thread){ +PairEndProcessor::PairEndProcessor(Options* opt){ + mWorkersLatch.store(opt->thread); mOptions = opt; mLeftReaderFinished = false; mRightReaderFinished = false; @@ -194,8 +193,11 @@ bool PairEndProcessor::process(){ readerRight->join(); } - // Wait for all worker threads to finish, then signal writers to flush and exit - mWorkersLatch.wait(); + // Wait for all worker threads to finish using futex-based latch + while(mWorkersLatch.load(std::memory_order_acquire) > 0) { + uint32_t cur = mWorkersLatch.load(std::memory_order_acquire); + if(cur > 0) hwy::BlockUntilDifferent(cur, mWorkersLatch); + } if(mLeftWriter) mLeftWriter->setInputCompleted(); if(mRightWriter) @@ -254,49 +256,49 @@ bool PairEndProcessor::process(){ Stats* finalPostStats2 = Stats::merge(postStats2); FilterResult* finalFilterResult = FilterResult::merge(filterResults); - std::println(stderr, "Read1 before filtering:"); + cerr << "Read1 before filtering:" << endl; finalPreStats1->print(); - std::println(stderr, ""); - std::println(stderr, "Read2 before filtering:"); + cerr << endl; + cerr << "Read2 before filtering:" << endl; finalPreStats2->print(); - std::println(stderr, ""); + cerr << endl; if(!mOptions->merge.enabled) { - std::println(stderr, "Read1 after filtering:"); + cerr << "Read1 after filtering:" << endl; finalPostStats1->print(); - std::println(stderr, ""); - std::println(stderr, "Read2 after filtering:"); + cerr << endl; + cerr << "Read2 after filtering:" << endl; finalPostStats2->print(); } else { - std::println(stderr, "Merged and filtered:"); + cerr << "Merged and filtered:" << endl; finalPostStats1->print(); } - std::println(stderr, ""); - std::println(stderr, "Filtering result:"); + cerr << endl; + cerr << "Filtering result:" << endl; finalFilterResult->print(); double dupRate = 0.0; if(mOptions->duplicate.enabled) { dupRate = mDuplicate->getDupRate(); - std::println(stderr, ""); - std::println(stderr, "Duplication rate: {:.4}%", dupRate * 100.0); + cerr << endl; + cerr << "Duplication rate: " << dupRate * 100.0 << "%" << endl; } // insert size distribution int peakInsertSize = getPeakInsertSize(); - std::println(stderr, ""); - std::println(stderr, "Insert size peak (evaluated by paired-end reads): {}", peakInsertSize); + cerr << endl; + cerr << "Insert size peak (evaluated by paired-end reads): " << peakInsertSize << endl; if(mOptions->merge.enabled) { - std::println(stderr, ""); - std::println(stderr, "Read pairs merged: {}", finalFilterResult->mMergedPairs); + cerr << endl; + cerr << "Read pairs merged: " << finalFilterResult->mMergedPairs << endl; if(finalPostStats1->getReads() > 0) { double postMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPostStats1->getReads(); double preMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPreStats1->getReads(); - std::println(stderr, "% of original read pairs: {:.4}%", preMergedPercent); - std::println(stderr, "% in reads after filtering: {:.4}%", postMergedPercent); + cerr << "% of original read pairs: " << preMergedPercent << "%" << endl; + cerr << "% in reads after filtering: " << postMergedPercent << "%" << endl; } - std::println(stderr, ""); + cerr << endl; } // make JSON report @@ -1024,7 +1026,8 @@ void PairEndProcessor::processorTask(ThreadConfig* config) inputLeft->setConsumerFinished(); inputRight->setConsumerFinished(); - mWorkersLatch.count_down(); + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) + hwy::WakeAll(mWorkersLatch); if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed"; loginfo(msg); diff --git a/src/peprocessor.h b/src/peprocessor.h index e957edea..e8e76dbc 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -17,7 +17,7 @@ #include "writerthread.h" #include "duplicate.h" #include "readpool.h" -#include +#include using namespace std; @@ -47,7 +47,7 @@ class PairEndProcessor{ private: atomic_bool mLeftReaderFinished; atomic_bool mRightReaderFinished; - std::latch mWorkersLatch; + std::atomic mWorkersLatch; Options* mOptions; Filter* mFilter; UmiProcessor* mUmiProcessor; diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 8463a35f..a372c20d 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,6 +1,5 @@ #include "seprocessor.h" #include "hwy/contrib/thread_pool/futex.h" -#include #include "fastqreader.h" #include #include @@ -14,8 +13,8 @@ #include "adaptertrimmer.h" #include "polyx.h" -SingleEndProcessor::SingleEndProcessor(Options* opt) - : mWorkersLatch(opt->thread){ +SingleEndProcessor::SingleEndProcessor(Options* opt){ + mWorkersLatch.store(opt->thread); mOptions = opt; mReaderFinished = false; mFilter = new Filter(opt); @@ -90,37 +89,41 @@ bool SingleEndProcessor::process(){ initConfig(configs[t]); } - // Reader thread (jthread auto-joins on destruction) - std::jthread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); + std::thread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); - // Worker threads - std::vector workers; - workers.reserve(mOptions->thread); - for(int t=0; tthread; t++) - workers.emplace_back(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); + std::thread** threads = new thread*[mOptions->thread]; + for(int t=0; tthread; t++){ + threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); + } - // Writer threads (conditional) - std::optional leftWriterThread; - std::optional failedWriterThread; + std::thread* leftWriterThread = NULL; + std::thread* failedWriterThread = NULL; if(mLeftWriter) - leftWriterThread.emplace(std::bind(&SingleEndProcessor::writerTask, this, mLeftWriter)); + leftWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mLeftWriter)); if(mFailedWriter) - failedWriterThread.emplace(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); + failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); - // Wait for reader to finish, then workers, then signal writers readerThread.join(); - mWorkersLatch.wait(); + // Wait for all worker threads using futex-based latch + while(mWorkersLatch.load(std::memory_order_acquire) > 0) { + uint32_t cur = mWorkersLatch.load(std::memory_order_acquire); + if(cur > 0) hwy::BlockUntilDifferent(cur, mWorkersLatch); + } if(mLeftWriter) mLeftWriter->setInputCompleted(); if(mFailedWriter) mFailedWriter->setInputCompleted(); - workers.clear(); + for(int t=0; tthread; t++){ + threads[t]->join(); + } if(!mOptions->split.enabled) { - leftWriterThread.reset(); - failedWriterThread.reset(); + if(leftWriterThread) + leftWriterThread->join(); + if(failedWriterThread) + failedWriterThread->join(); } if(mOptions->verbose) @@ -145,21 +148,21 @@ bool SingleEndProcessor::process(){ postStats.push_back(configs[t]->getPostStats1()); } - std::println(stderr, "Read1 before filtering:"); + cerr << "Read1 before filtering:" << endl; finalPreStats->print(); - std::println(stderr, ""); - std::println(stderr, "Read1 after filtering:"); + cerr << endl; + cerr << "Read1 after filtering:" << endl; finalPostStats->print(); - std::println(stderr, ""); - std::println(stderr, "Filtering result:"); + cerr << endl; + cerr << "Filtering result:" << endl; finalFilterResult->print(); double dupRate = 0.0; if(mOptions->duplicate.enabled) { dupRate = mDuplicate->getDupRate(); - std::println(stderr, ""); - std::println(stderr, "Duplication rate (may be overestimated since this is SE data): {:.4}%", dupRate * 100.0); + cerr << endl; + cerr << "Duplication rate (may be overestimated since this is SE data): " << dupRate * 100.0 << "%" << endl; } // make JSON report @@ -174,6 +177,8 @@ bool SingleEndProcessor::process(){ // clean up for(int t=0; tthread; t++){ + delete threads[t]; + threads[t] = NULL; delete configs[t]; configs[t] = NULL; } @@ -182,8 +187,14 @@ bool SingleEndProcessor::process(){ delete finalPostStats; delete finalFilterResult; + delete[] threads; delete[] configs; + if(leftWriterThread) + delete leftWriterThread; + if(failedWriterThread) + delete failedWriterThread; + if(!mOptions->split.enabled) closeOutput(); @@ -453,7 +464,8 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) } input->setConsumerFinished(); - mWorkersLatch.count_down(); + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) + hwy::WakeAll(mWorkersLatch); if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " finished"; diff --git a/src/seprocessor.h b/src/seprocessor.h index 962db7ca..12dd6ac4 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -17,7 +17,7 @@ #include "duplicate.h" #include "singleproducersingleconsumerlist.h" #include "readpool.h" -#include +#include using namespace std; @@ -42,7 +42,7 @@ class SingleEndProcessor{ private: Options* mOptions; atomic_bool mReaderFinished; - std::latch mWorkersLatch; + std::atomic mWorkersLatch; Filter* mFilter; UmiProcessor* mUmiProcessor; WriterThread* mLeftWriter; From 11c02490ae342f079289d1d4749ef5c341413433 Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 22:40:30 -0700 Subject: [PATCH 14/20] fix: writer thread deadlock with BlockUntilDifferent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BlockUntilDifferent(prev, atom) only returns when atom != prev. When mBufferLength stays 0, WakeAll cannot break the loop. Fix: use separate mWriterNotify counter for writer thread blocking. - input(): increments mWriterNotify + WakeAll to wake writer - setInputCompleted(): increments mWriterNotify + WakeAll to wake writer - output(): blocks on mWriterNotify instead of mBufferLength Also move setInputCompleted() back to last worker thread (not main thread), matching the original master pattern. This avoids a race where main thread waits on latch while writer is already blocked. Verified: SE smoke ✅, PE smoke ✅, PE benchmark wall -10%, sys -71%. --- src/peprocessor.cpp | 38 ++++++++++++++++---------------------- src/seprocessor.cpp | 19 ++++++------------- src/writerthread.cpp | 29 +++++++++++++++++++---------- src/writerthread.h | 1 + 4 files changed, 42 insertions(+), 45 deletions(-) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 4362df84..6431cce2 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -193,26 +193,6 @@ bool PairEndProcessor::process(){ readerRight->join(); } - // Wait for all worker threads to finish using futex-based latch - while(mWorkersLatch.load(std::memory_order_acquire) > 0) { - uint32_t cur = mWorkersLatch.load(std::memory_order_acquire); - if(cur > 0) hwy::BlockUntilDifferent(cur, mWorkersLatch); - } - if(mLeftWriter) - mLeftWriter->setInputCompleted(); - if(mRightWriter) - mRightWriter->setInputCompleted(); - if(mUnpairedLeftWriter) - mUnpairedLeftWriter->setInputCompleted(); - if(mUnpairedRightWriter) - mUnpairedRightWriter->setInputCompleted(); - if(mMergedWriter) - mMergedWriter->setInputCompleted(); - if(mFailedWriter) - mFailedWriter->setInputCompleted(); - if(mOverlappedWriter) - mOverlappedWriter->setInputCompleted(); - for(int t=0; tthread; t++){ threads[t]->join(); } @@ -1026,8 +1006,22 @@ void PairEndProcessor::processorTask(ThreadConfig* config) inputLeft->setConsumerFinished(); inputRight->setConsumerFinished(); - if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) - hwy::WakeAll(mWorkersLatch); + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) { + if(mLeftWriter) + mLeftWriter->setInputCompleted(); + if(mRightWriter) + mRightWriter->setInputCompleted(); + if(mUnpairedLeftWriter) + mUnpairedLeftWriter->setInputCompleted(); + if(mUnpairedRightWriter) + mUnpairedRightWriter->setInputCompleted(); + if(mMergedWriter) + mMergedWriter->setInputCompleted(); + if(mFailedWriter) + mFailedWriter->setInputCompleted(); + if(mOverlappedWriter) + mOverlappedWriter->setInputCompleted(); + } if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed"; loginfo(msg); diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index a372c20d..4613cf90 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -104,17 +104,6 @@ bool SingleEndProcessor::process(){ failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); readerThread.join(); - - // Wait for all worker threads using futex-based latch - while(mWorkersLatch.load(std::memory_order_acquire) > 0) { - uint32_t cur = mWorkersLatch.load(std::memory_order_acquire); - if(cur > 0) hwy::BlockUntilDifferent(cur, mWorkersLatch); - } - if(mLeftWriter) - mLeftWriter->setInputCompleted(); - if(mFailedWriter) - mFailedWriter->setInputCompleted(); - for(int t=0; tthread; t++){ threads[t]->join(); } @@ -464,8 +453,12 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) } input->setConsumerFinished(); - if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) - hwy::WakeAll(mWorkersLatch); + if(mWorkersLatch.fetch_sub(1, std::memory_order_release) - 1 == 0) { + if(mLeftWriter) + mLeftWriter->setInputCompleted(); + if(mFailedWriter) + mFailedWriter->setInputCompleted(); + } if(mOptions->verbose) { string msg = "thread " + to_string(config->getThreadId() + 1) + " finished"; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 5749411b..a27d7b94 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -43,11 +43,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ } mWorkingBufferList = 0; mBufferLength = 0; + mWriterNotify = 0; } else { initWriter(filename, isSTDOUT); initBufferLists(); mWorkingBufferList = 0; mBufferLength = 0; + mWriterNotify = 0; } } @@ -71,6 +73,9 @@ bool WriterThread::setInputCompleted() { for(int t=0; tthread; t++) { mBufferLists[t]->setProducerFinished(); } + // Wake the writer thread blocked in output() so it re-checks isCompleted() + mWriterNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mWriterNotify); return true; } @@ -99,16 +104,19 @@ void WriterThread::output(){ if (mPwriteMode) return; // no-op SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { - uint32_t cur = mBufferLength.load(std::memory_order_acquire); - if(cur == 0) hwy::BlockUntilDifferent(cur, mBufferLength); - } else { - string* str = list->consume(); - mWriter1->write(str->data(), str->length()); - delete str; - mBufferLength.fetch_sub(1, std::memory_order_release); - hwy::WakeAll(mBufferLength); - mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; + if(mInputCompleted) return; + // Current slot has no data yet. Block until a producer or + // setInputCompleted() wakes us via mWriterNotify. + uint32_t cur = mWriterNotify.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mWriterNotify); + // After wake, return to writerTask loop which re-checks isCompleted() + return; } + string* str = list->consume(); + mWriter1->write(str->data(), str->length()); + delete str; + mBufferLength.fetch_sub(1, std::memory_order_release); + mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; } void WriterThread::input(int tid, string* data) { @@ -118,7 +126,8 @@ void WriterThread::input(int tid, string* data) { } mBufferLists[tid]->produce(data); mBufferLength.fetch_add(1, std::memory_order_release); - hwy::WakeAll(mBufferLength); + mWriterNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mWriterNotify); } void WriterThread::inputPwrite(int tid, string* data) { diff --git a/src/writerthread.h b/src/writerthread.h index f57313aa..699740e2 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -60,6 +60,7 @@ class WriterThread{ bool mInputCompleted; std::atomic mBufferLength; + std::atomic mWriterNotify; // incremented to wake writer thread SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; From f5f4bca94ac3989f14ce407ed4e7d1de35bf0bfb Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Thu, 16 Apr 2026 23:55:19 -0700 Subject: [PATCH 15/20] =?UTF-8?q?perf:=20replace=20pwrite=20ring=20sleep?= =?UTF-8?q?=5Ffor(1=C2=B5s)=20spin=20with=20Highway=20futex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pwrite ring buffer used std::this_thread::sleep_for(1µs) to poll for the previous slot's published_seq. Replace with hwy::BlockUntilDifferent + hwy::WakeAll for precise wakeup. Changes: - OffsetSlot::published_seq: atomic → atomic (seq values are small; uint32_t required by Highway futex API) - Wait loop: sleep_for(1µs) → BlockUntilDifferent(cur, published_seq) - Publish: store + WakeAll to notify waiting workers --- src/writerthread.cpp | 14 ++++++++------ src/writerthread.h | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index a27d7b94..99cefedf 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -6,7 +6,6 @@ #include #include #include -#include WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mOptions = opt; @@ -148,13 +147,15 @@ void WriterThread::inputPwrite(int tid, string* data) { size_t seq = mNextSeq[tid].load(std::memory_order_relaxed); - // Wait for previous batch's cumulative offset. - // Sleep yields CPU to prevent livelock under contention. + // Wait for previous batch's cumulative offset using futex. size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); - while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); + uint32_t target = static_cast(seq - 1); + while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != target) { + uint32_t cur = mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire); + if (cur != target) + hwy::BlockUntilDifferent(cur, mOffsetRing[prevSlot].published_seq); } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -162,7 +163,8 @@ void WriterThread::inputPwrite(int tid, string* data) { // Publish offset BEFORE pwrite — next worker starts immediately size_t mySlot = seq & (OFFSET_RING_SIZE - 1); mOffsetRing[mySlot].cumulative_offset.store(offset + wsize, std::memory_order_relaxed); - mOffsetRing[mySlot].published_seq.store(seq, std::memory_order_release); + mOffsetRing[mySlot].published_seq.store(static_cast(seq), std::memory_order_release); + hwy::WakeAll(mOffsetRing[mySlot].published_seq); // pwrite (concurrent with other workers on non-overlapping regions) if (wsize > 0) { diff --git a/src/writerthread.h b/src/writerthread.h index 699740e2..4e98ad1f 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -19,7 +19,7 @@ static constexpr int OFFSET_RING_SIZE = 512; struct alignas(64) OffsetSlot { std::atomic cumulative_offset{0}; - std::atomic published_seq{SIZE_MAX}; + std::atomic published_seq{UINT32_MAX}; }; class WriterThread{ From 2f348b3a77e10265f23c0fb0f3e9541c97edd94c Mon Sep 17 00:00:00 2001 From: Kim Yann Date: Fri, 17 Apr 2026 00:01:36 -0700 Subject: [PATCH 16/20] perf: replace bgzf mutex+condvar with Highway futex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all std::mutex + std::condition_variable synchronization in BgzfMtReader with lock-free Highway futex primitives: - Per-slot atomic state: BlockUntilDifferent waits for state transitions (FREE→COMPRESSED→DECOMPRESSING→READY→FREE) - Global mSlotNotify counter: incremented on every state transition, used by decompressor threads to block when no COMPRESSED slots are available (replaces condvar broadcast) - Remove / includes from pe/seprocessor.h and writerthread.h (no longer used anywhere in the pipeline) This eliminates all kernel mutex contention in the BGZF decompression pipeline, which is the hot path for .gz input files. --- src/bgzf.h | 120 +++++++++++++++++++++++++-------------------- src/peprocessor.h | 2 - src/seprocessor.h | 2 - src/writerthread.h | 1 - 4 files changed, 68 insertions(+), 57 deletions(-) diff --git a/src/bgzf.h b/src/bgzf.h index 9b02852a..8cbf43e1 100644 --- a/src/bgzf.h +++ b/src/bgzf.h @@ -6,10 +6,9 @@ #include #include #include -#include -#include #include #include +#include static const int BGZF_HEADER_SIZE = 18; static const int BGZF_MAX_BLOCK_SIZE = 65536; @@ -32,44 +31,46 @@ static inline uint32_t bgzfBlockSize(const unsigned char* h) { // Parallel BGZF decompression with fixed thread pool. // All decompress threads start upfront; ring buffer provides backpressure. -// Slot lifecycle: FREE → COMPRESSED → DECOMPRESSING → READY → FREE +// Slot lifecycle: FREE -> COMPRESSED -> DECOMPRESSING -> READY -> FREE +// Synchronization: Highway futex (BlockUntilDifferent/WakeAll) on per-slot +// atomic state, plus a global mSlotNotify counter for decompressor wakeup. class BgzfMtReader { - enum SlotState { FREE, COMPRESSED, DECOMPRESSING, READY, DONE }; + static const uint32_t STATE_FREE = 0; + static const uint32_t STATE_COMPRESSED = 1; + static const uint32_t STATE_DECOMPRESSING = 2; + static const uint32_t STATE_READY = 3; + static const uint32_t STATE_DONE = 4; struct alignas(64) Slot { unsigned char comp[BGZF_MAX_BLOCK_SIZE]; unsigned char decomp[BGZF_MAX_BLOCK_SIZE]; int compLen; int decompLen; - std::atomic state{FREE}; + std::atomic state{STATE_FREE}; }; public: - // threadBudget: max decompress threads allowed for this reader. - // 0 = auto (use half of available CPU cores). - // Caller should compute: (hardware_concurrency - workers - readers - writers) / num_gz_inputs BgzfMtReader(FILE* fp, int threadBudget = 0) : mFp(fp), mConsumeIdx(0), mConsumeOffset(0), mProduceIdx(0), mStop(false) { + mSlotNotify.store(0, std::memory_order_relaxed); int cpus = std::thread::hardware_concurrency(); if (cpus < 2) cpus = 2; mMaxPool = (threadBudget > 0) ? threadBudget : std::max(1, cpus / 2); mRingSize = mMaxPool * 4; if (mRingSize < 16) mRingSize = 16; - if (mRingSize > 64) mRingSize = 64; // cap at ~8MB per reader + if (mRingSize > 64) mRingSize = 64; mSlots = new Slot[mRingSize]; - // Start all decompress threads upfront — ring buffer handles backpressure for (int i = 0; i < mMaxPool; i++) mPool.push_back(new std::thread(&BgzfMtReader::decompWorker, this)); mReaderThread = new std::thread(&BgzfMtReader::readerLoop, this); } ~BgzfMtReader() { - mStop = true; - mDecompCv.notify_all(); - mProduceCv.notify_all(); - mConsumeCv.notify_all(); + mStop.store(true, std::memory_order_release); + // Wake all waiters so they see mStop + notifyAll(); if (mReaderThread) { mReaderThread->join(); delete mReaderThread; } for (auto* t : mPool) { t->join(); delete t; } delete[] mSlots; @@ -79,14 +80,13 @@ class BgzfMtReader { int filled = 0; while (filled < outBufSize) { Slot& s = mSlots[mConsumeIdx % mRingSize]; - { - std::unique_lock lk(mConsumeMtx); - mConsumeCv.wait(lk, [&]() { - SlotState v = s.state.load(std::memory_order_acquire); - return v == READY || v == DONE; - }); + // Wait for slot to become READY or DONE + while (true) { + uint32_t v = s.state.load(std::memory_order_acquire); + if (v == STATE_READY || v == STATE_DONE) break; + hwy::BlockUntilDifferent(v, s.state); } - if (s.state.load(std::memory_order_acquire) == DONE) break; + if (s.state.load(std::memory_order_acquire) == STATE_DONE) break; int avail = s.decompLen - mConsumeOffset; int tocopy = (outBufSize - filled) < avail ? (outBufSize - filled) : avail; @@ -95,27 +95,39 @@ class BgzfMtReader { mConsumeOffset += tocopy; if (mConsumeOffset >= s.decompLen) { - s.state.store(FREE, std::memory_order_release); + s.state.store(STATE_FREE, std::memory_order_release); + hwy::WakeAll(s.state); mConsumeOffset = 0; mConsumeIdx++; - mProduceCv.notify_one(); + // Notify producer that a slot is free + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } return filled; } private: + void notifyAll() { + // Wake all threads blocked on slot states or mSlotNotify + for (int i = 0; i < mRingSize; i++) { + hwy::WakeAll(mSlots[i].state); + } + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); + } + void readerLoop() { unsigned char header[BGZF_HEADER_SIZE]; - while (!mStop) { + while (!mStop.load(std::memory_order_acquire)) { Slot& s = mSlots[mProduceIdx % mRingSize]; - { - std::unique_lock lk(mProduceMtx); - mProduceCv.wait(lk, [&]() { - return s.state.load(std::memory_order_acquire) == FREE || mStop; - }); - if (mStop) break; + // Wait for slot to become FREE + while (true) { + if (mStop.load(std::memory_order_acquire)) return; + uint32_t v = s.state.load(std::memory_order_acquire); + if (v == STATE_FREE) break; + hwy::BlockUntilDifferent(v, s.state); } size_t n = fread(header, 1, BGZF_HEADER_SIZE, mFp); @@ -134,22 +146,24 @@ class BgzfMtReader { } s.compLen = bsize; - s.state.store(COMPRESSED, std::memory_order_release); + s.state.store(STATE_COMPRESSED, std::memory_order_release); + hwy::WakeAll(s.state); mProduceIdx++; - mDecompCv.notify_all(); + // Notify decompressors that a slot has data + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } void decompWorker() { - while (!mStop) { + while (!mStop.load(std::memory_order_acquire)) { Slot* target = nullptr; - { - std::unique_lock lk(mDecompMtx); - mDecompCv.wait(lk, [&]() { - return mStop.load(std::memory_order_relaxed) || claimSlot(&target); - }); - if (mStop && !target) return; - if (!target) continue; + if (!claimSlot(&target)) { + if (mStop.load(std::memory_order_acquire)) return; + // No slot available — block until state changes somewhere + uint32_t cur = mSlotNotify.load(std::memory_order_acquire); + hwy::BlockUntilDifferent(cur, mSlotNotify); + continue; } struct inflate_state ist; @@ -162,8 +176,11 @@ class BgzfMtReader { int ret = isal_inflate_stateless(&ist); target->decompLen = (ret == ISAL_DECOMP_OK) ? (int)ist.total_out : 0; - target->state.store(READY, std::memory_order_release); - mConsumeCv.notify_one(); + target->state.store(STATE_READY, std::memory_order_release); + hwy::WakeAll(target->state); + // Notify consumer that data is ready + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } } @@ -172,8 +189,8 @@ class BgzfMtReader { int tail = mProduceIdx; for (int i = head; i < tail; i++) { Slot& s = mSlots[i % mRingSize]; - SlotState expected = COMPRESSED; - if (s.state.compare_exchange_strong(expected, DECOMPRESSING, + uint32_t expected = STATE_COMPRESSED; + if (s.state.compare_exchange_strong(expected, STATE_DECOMPRESSING, std::memory_order_acq_rel)) { *out = &s; return true; @@ -183,25 +200,24 @@ class BgzfMtReader { } void markDone(Slot& s) { - s.state.store(DONE, std::memory_order_release); - mConsumeCv.notify_all(); - mDecompCv.notify_all(); + s.state.store(STATE_DONE, std::memory_order_release); + hwy::WakeAll(s.state); + mSlotNotify.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mSlotNotify); } - FILE* mFp; // not owned — caller must keep open for lifetime of BgzfMtReader + FILE* mFp; Slot* mSlots; int mRingSize; std::atomic mConsumeIdx; - int mConsumeOffset; // only accessed by consumer thread + int mConsumeOffset; std::atomic mProduceIdx; int mMaxPool; std::atomic mStop; + std::atomic mSlotNotify; // monotonic counter for cross-thread wakeup std::thread* mReaderThread; std::vector mPool; - - std::mutex mConsumeMtx, mProduceMtx, mDecompMtx; - std::condition_variable mConsumeCv, mProduceCv, mDecompCv; }; #endif diff --git a/src/peprocessor.h b/src/peprocessor.h index e8e76dbc..4149d82a 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -6,8 +6,6 @@ #include #include "read.h" #include -#include -#include #include #include "options.h" #include "threadconfig.h" diff --git a/src/seprocessor.h b/src/seprocessor.h index 12dd6ac4..73794836 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -6,8 +6,6 @@ #include #include "read.h" #include -#include -#include #include #include "options.h" #include "threadconfig.h" diff --git a/src/writerthread.h b/src/writerthread.h index 4e98ad1f..4beffa7c 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -9,7 +9,6 @@ #include "options.h" #include #include "hwy/contrib/thread_pool/futex.h" -#include #include #include "singleproducersingleconsumerlist.h" From 5c0c444cb91c7e080dd77c3dec78ab12eb4cf783 Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Thu, 23 Apr 2026 20:30:42 +0800 Subject: [PATCH 17/20] fix: eliminate data races in writer thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make mInputCompleted atomic with acquire/release ordering to fix a race between producer and writer threads. Replace pwrite ring's published_seq with a monotonic generation counter to prevent ABA on slot reuse. Wake producers after buffer-length decrement so they unblock promptly. 🐘 Generated with Crush Co-Authored-By: Crush --- src/writerthread.cpp | 25 +++++++++++++------------ src/writerthread.h | 4 ++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 99cefedf..e46d9d75 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -10,7 +10,7 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mOptions = opt; mWriter1 = NULL; - mInputCompleted = false; + mInputCompleted.store(false, std::memory_order_relaxed); mFilename = filename; mPwriteMode = !isSTDOUT && ends_with(filename, ".gz") && mOptions->thread > 1; @@ -59,16 +59,16 @@ WriterThread::~WriterThread() { bool WriterThread::isCompleted() { if (mPwriteMode) return true; // no writer thread needed - return mInputCompleted && (mBufferLength==0); + return mInputCompleted.load(std::memory_order_acquire) && (mBufferLength==0); } bool WriterThread::setInputCompleted() { if (mPwriteMode) { setInputCompletedPwrite(); - mInputCompleted = true; + mInputCompleted.store(true, std::memory_order_release); return true; } - mInputCompleted = true; + mInputCompleted.store(true, std::memory_order_release); for(int t=0; tthread; t++) { mBufferLists[t]->setProducerFinished(); } @@ -103,7 +103,7 @@ void WriterThread::output(){ if (mPwriteMode) return; // no-op SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { - if(mInputCompleted) return; + if(mInputCompleted.load(std::memory_order_acquire)) return; // Current slot has no data yet. Block until a producer or // setInputCompleted() wakes us via mWriterNotify. uint32_t cur = mWriterNotify.load(std::memory_order_acquire); @@ -115,6 +115,7 @@ void WriterThread::output(){ mWriter1->write(str->data(), str->length()); delete str; mBufferLength.fetch_sub(1, std::memory_order_release); + hwy::WakeAll(mBufferLength); mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread; } @@ -151,11 +152,11 @@ void WriterThread::inputPwrite(int tid, string* data) { size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); - uint32_t target = static_cast(seq - 1); - while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != target) { - uint32_t cur = mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire); - if (cur != target) - hwy::BlockUntilDifferent(cur, mOffsetRing[prevSlot].published_seq); + uint32_t needGen = static_cast((seq - 1) / OFFSET_RING_SIZE + 1); + while (mOffsetRing[prevSlot].generation.load(std::memory_order_acquire) < needGen) { + uint32_t cur = mOffsetRing[prevSlot].generation.load(std::memory_order_acquire); + if (cur < needGen) + hwy::BlockUntilDifferent(cur, mOffsetRing[prevSlot].generation); } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -163,8 +164,8 @@ void WriterThread::inputPwrite(int tid, string* data) { // Publish offset BEFORE pwrite — next worker starts immediately size_t mySlot = seq & (OFFSET_RING_SIZE - 1); mOffsetRing[mySlot].cumulative_offset.store(offset + wsize, std::memory_order_relaxed); - mOffsetRing[mySlot].published_seq.store(static_cast(seq), std::memory_order_release); - hwy::WakeAll(mOffsetRing[mySlot].published_seq); + mOffsetRing[mySlot].generation.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mOffsetRing[mySlot].generation); // pwrite (concurrent with other workers on non-overlapping regions) if (wsize > 0) { diff --git a/src/writerthread.h b/src/writerthread.h index 4beffa7c..ba4d4e24 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -18,7 +18,7 @@ static constexpr int OFFSET_RING_SIZE = 512; struct alignas(64) OffsetSlot { std::atomic cumulative_offset{0}; - std::atomic published_seq{UINT32_MAX}; + std::atomic generation{0}; // bumped each time slot is published }; class WriterThread{ @@ -57,7 +57,7 @@ class WriterThread{ Options* mOptions; string mFilename; - bool mInputCompleted; + std::atomic mInputCompleted; std::atomic mBufferLength; std::atomic mWriterNotify; // incremented to wake writer thread SingleProducerSingleConsumerList** mBufferLists; From c093f882ff5af76584823173a20170d208b6e77a Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Fri, 24 Apr 2026 14:50:51 +0800 Subject: [PATCH 18/20] fix: break reader/worker/writer circular deadlock and close lost-wakeup races MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three connected thread-synchronization bugs caused fastp to hang under -w>=23 + plain (non-gz) output + --adapter_fasta. 1. Mid-flight deadlock: reader gated on mLeftWriter->waitForBufferBelow, writer drained mBufferLists in strict round-robin. When one worker ran slightly behind, its per-worker slot stayed empty while other slots piled up, pushing mBufferLength above the limit. The reader then halted at waitForBufferBelow, so the slow worker never received more input, its slot never filled, the writer stayed blocked, and every thread deadlocked. Confirmed by stack sample: 24 workers in peprocessor.cpp:1003, 2 readers in peprocessor.cpp:807, 2 writers in writerthread.cpp:110. Removed the writer-buffer backpressure — the pack-level backpressure (mLeftPackReadCounter - mPackProcessedCounter) already bounds in-flight memory without creating the cycle. 2. Reader-shutdown lost wakeup: readerTask/interleavedReaderTask/SE readerTask called setProducerFinished() without bumping mPackProducedCounter. A worker that had just snapshotted the counter in BlockUntilDifferent would miss the completion signal and sleep forever. Added a counter bump + WakeAll after setProducerFinished. 3. Writer-shutdown lost wakeup: WriterThread::output() checked mInputCompleted before snapshotting mWriterNotify. If setInputCompleted ran between the check and BlockUntilDifferent, cur captured the post-bump value and the writer blocked forever. Swapped the order so the snapshot is taken first; any subsequent bump is then guaranteed to make cur \!= current and return immediately. Verified on macOS ARM64 with 10M simulated pairs, -w 24, plain fq, --adapter_fasta: previously hung indefinitely, now completes in 38s. 🤖 Generated with Claude Code Co-Authored-By: Claude --- src/peprocessor.cpp | 32 ++++++++++++++++++++------------ src/seprocessor.cpp | 14 +++++++++----- src/writerthread.cpp | 10 +++++++--- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 6431cce2..6dbe7e2f 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -801,12 +801,14 @@ void PairEndProcessor::readerTask(bool isLeft) } } readNum += count; - // if the writer threads are far behind this producer, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - if(mLeftWriter) mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); - if(mRightWriter) mRightWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); - } + // NOTE: mLeftWriter->waitForBufferBelow() used to be gated here. It + // caused a mid-flight deadlock under -w>=23 + plain fq + adapter_fasta: + // writer consumes per-worker lists in strict round-robin, so one slow + // worker leaves its slot empty while others pile up, mBufferLength + // stays above the limit, the reader halts here, the slow worker never + // receives new input, its slot stays empty, and every thread blocks. + // Pack-level backpressure (mLeftPackReadCounter - mPackProcessedCounter + // above) already bounds in-flight memory without the cycle. // reset count to 0 count = 0; // re-evaluate split size @@ -832,6 +834,11 @@ void PairEndProcessor::readerTask(bool isLeft) else mRightInputLists[t]->setProducerFinished(); } + // Wake workers that may have latched a mPackProducedCounter snapshot just + // before setProducerFinished() ran; setProducerFinished does not bump the + // counter so without this they would miss the completion signal. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); if(mOptions->verbose) { if(isLeft) { @@ -935,12 +942,9 @@ void PairEndProcessor::interleavedReaderTask() slept++; } readNum += count; - // if the writer threads are far behind this producer, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - if(mLeftWriter) mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); - if(mRightWriter) mRightWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); - } + // NOTE: see readerTask() for why we no longer gate the reader on + // mLeftWriter->bufferLength here — round-robin writer + tight buffer + // limit caused a mid-flight deadlock. // reset count to 0 count = 0; // re-evaluate split size @@ -966,6 +970,10 @@ void PairEndProcessor::interleavedReaderTask() mLeftInputLists[t]->setProducerFinished(); mRightInputLists[t]->setProducerFinished(); } + // Wake any worker that snapshot mPackProducedCounter just before the + // setProducerFinished() above — without a counter bump they would miss it. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); if(mOptions->verbose) { loginfo("interleaved: loading completed with " + to_string(mLeftPackReadCounter) + " packs"); diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 4613cf90..cb88c018 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -388,11 +388,11 @@ void SingleEndProcessor::readerTask() slept++; } readNum += count; - // if the writer threads are far behind this reader, sleep and wait - // check this only when necessary - if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) { - mLeftWriter->waitForBufferBelow(PACK_IN_MEM_LIMIT); - } + // NOTE: writer-buffer backpressure (waitForBufferBelow) removed — + // it formed a circular wait with round-robin writer consumption and + // deadlocked under high thread counts. Pack-level backpressure + // (mPackReadCounter - mPackProcessedCounter above) still bounds + // in-flight memory. // reset count to 0 count = 0; // re-evaluate split size @@ -414,6 +414,10 @@ void SingleEndProcessor::readerTask() for(int t=0; tthread; t++) mInputLists[t]->setProducerFinished(); + // Wake any worker that snapshot mPackProducedCounter just before the + // setProducerFinished() above — without a counter bump they would miss it. + mPackProducedCounter.fetch_add(1, std::memory_order_release); + hwy::WakeAll(mPackProducedCounter); //std::unique_lock lock(mRepo.readCounterMtx); mReaderFinished.store(true, std::memory_order_release); diff --git a/src/writerthread.cpp b/src/writerthread.cpp index e46d9d75..23942475 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -103,10 +103,14 @@ void WriterThread::output(){ if (mPwriteMode) return; // no-op SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { - if(mInputCompleted.load(std::memory_order_acquire)) return; - // Current slot has no data yet. Block until a producer or - // setInputCompleted() wakes us via mWriterNotify. + // Snapshot mWriterNotify BEFORE checking mInputCompleted to avoid a + // lost-wakeup race: if setInputCompleted() runs between the state check + // and BlockUntilDifferent, it bumps mWriterNotify once and no further + // bumps arrive. Capturing cur first guarantees the bump is observable + // (cur != current value), so BlockUntilDifferent returns immediately. uint32_t cur = mWriterNotify.load(std::memory_order_acquire); + if(list->canBeConsumed()) return; // producer raced in; let outer loop consume + if(mInputCompleted.load(std::memory_order_acquire)) return; hwy::BlockUntilDifferent(cur, mWriterNotify); // After wake, return to writerTask loop which re-checks isCompleted() return; From f01c530c0ffe2285c95df1ebb2a71c06814763da Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Wed, 29 Apr 2026 22:37:30 +0800 Subject: [PATCH 19/20] fix: eliminate macOS build warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use standard C++ containers and bounded formatting so clean rebuilds stay warning-free under clang. 💘 Generated with Crush Co-Authored-By: Crush --- src/htmlreporter.cpp | 3 ++- src/matcher.cpp | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/htmlreporter.cpp b/src/htmlreporter.cpp index 697f232b..f6c04850 100644 --- a/src/htmlreporter.cpp +++ b/src/htmlreporter.cpp @@ -1,5 +1,6 @@ #include "htmlreporter.h" #include +#include #include #include "knownadapters.h" @@ -614,7 +615,7 @@ const string HtmlReporter::getCurrentSystemTime() auto tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); struct tm* ptm = localtime(&tt); char date[60] = {0}; - sprintf(date, "%d-%02d-%02d %02d:%02d:%02d", + snprintf(date, sizeof(date), "%d-%02d-%02d %02d:%02d:%02d", (int)ptm->tm_year + 1900,(int)ptm->tm_mon + 1,(int)ptm->tm_mday, (int)ptm->tm_hour,(int)ptm->tm_min,(int)ptm->tm_sec); return std::string(date); diff --git a/src/matcher.cpp b/src/matcher.cpp index c438ca6c..701660ef 100644 --- a/src/matcher.cpp +++ b/src/matcher.cpp @@ -1,4 +1,5 @@ #include "matcher.h" +#include Matcher::Matcher(){ } @@ -9,8 +10,8 @@ Matcher::~Matcher(){ bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) { // accumlated mismatches from left/right - int accMismatchFromLeft[cmplen]; - int accMismatchFromRight[cmplen]; + vector accMismatchFromLeft(cmplen); + vector accMismatchFromRight(cmplen); // accMismatchFromLeft[0]: head vs. head // accMismatchFromRight[cmplen-1]: tail vs. tail @@ -55,8 +56,8 @@ bool Matcher::matchWithOneInsertion(const char* insData, const char* normalData, int Matcher::diffWithOneInsertion(const char* insData, const char* normalData, int cmplen, int diffLimit) { // accumlated mismatches from left/right - int accMismatchFromLeft[cmplen]; - int accMismatchFromRight[cmplen]; + vector accMismatchFromLeft(cmplen); + vector accMismatchFromRight(cmplen); // accMismatchFromLeft[0]: head vs. head // accMismatchFromRight[cmplen-1]: tail vs. tail From 271acddd06c55b1490eb817864c0c8c00b1a5a16 Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Sat, 2 May 2026 10:24:38 +0800 Subject: [PATCH 20/20] fix(spsc): eliminate all TSan-confirmed data races in SPSC queue Three layered fixes in singleproducersingleconsumerlist.h: 1. nextItem -> std::atomic with release store in produce() and acquire-gated read in consume(): prevents producer/consumer racing on nextItem when consumer arrives via h==tail path (which bypasses the nextItemReady acquire that normally establishes HB). 2. head.load(relaxed) -> head.load(acquire) in produce(): prevents stale head read causing else-branch to link new item to an already- consumed tail -- both a real data-loss bug and a TSan race on tail->nextItem (consumer reads it while producer writes it). 3. tail.store(relaxed) -> tail.store(release) in first-item branch: ensures TSan can track the direct release-acquire edge on tail (indirect HB through a different atomic is not followed by TSan vector-clock implementation). Also: remove UB memset on std::atomic objects; rely on constructors. H5 (uninitialised value in default ctor) fixed via : value() init. Verified: 3x TSan runs with 8 worker threads, 0 warnings. Co-Authored-By: Claude Sonnet 4.6 --- src/singleproducersingleconsumerlist.h | 51 +++++++++++++++----------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index 5f19852c..067b7053 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -45,17 +45,14 @@ SOFTWARE. template struct LockFreeListItem { public: - inline LockFreeListItem(T val) { + inline LockFreeListItem(T val) : nextItem(nullptr), nextItemReady(false) { value = val; - nextItemReady = false; - nextItem = NULL; - } - inline LockFreeListItem() { - nextItem = NULL; - nextItemReady = false; } + inline LockFreeListItem() : value(), nextItem(nullptr), nextItemReady(false) {} T value; - LockFreeListItem* nextItem; + // Atomic so producer's store(release) pairs with consumer's load(acquire) in consume(), + // regardless of whether consumer arrived via nextItemReady or h==tail path. + std::atomic*> nextItem; std::atomic_bool nextItemReady; }; @@ -64,7 +61,7 @@ class SingleProducerSingleConsumerList { public: inline SingleProducerSingleConsumerList() { head.store(NULL, std::memory_order_relaxed); - tail = NULL; + tail.store(NULL, std::memory_order_relaxed); producerFinished = false; consumerFinished = false; produced = 0; @@ -100,23 +97,24 @@ class SingleProducerSingleConsumerList { // The last node has no successor, so `nextItemReady` may remain false; // it must still be consumable to avoid writer stalls when many queues exist. return h->nextItemReady.load(std::memory_order_acquire) - || (h == tail) + || (h == tail.load(std::memory_order_acquire)) || producerFinished.load(std::memory_order_acquire); } inline void produce(T val) { LockFreeListItem* item = makeItem(val); - if(head.load(std::memory_order_relaxed) == NULL) { - tail = item; - // Release store: publishing head to consumer thread. - // All writes to *item are ordered before this store. + // Acquire: ensures producer sees consumer's latest head advance (e.g. NULL after drain). + // Stale relaxed read would cause else-branch to link new item to consumed tail, + // making it unreachable (data loss) and racing on tail->nextItem. + if(head.load(std::memory_order_acquire) == NULL) { + tail.store(item, std::memory_order_release); head.store(item, std::memory_order_release); - // Signal the first item is consumable (no predecessor to set this) item->nextItemReady.store(true, std::memory_order_release); } else { - tail->nextItem = item; - // Release store: ensures nextItem write visible before nextItemReady flag. - tail->nextItemReady.store(true, std::memory_order_release); - tail = item; + LockFreeListItem* t = tail.load(std::memory_order_relaxed); + // release: pairs with consume()'s nextItem.load(acquire) after nextItemReady check. + t->nextItem.store(item, std::memory_order_release); + t->nextItemReady.store(true, std::memory_order_release); + tail.store(item, std::memory_order_release); } produced.fetch_add(1, std::memory_order_relaxed); } @@ -124,8 +122,16 @@ class SingleProducerSingleConsumerList { LockFreeListItem* h = head.load(std::memory_order_acquire); assert(h != NULL); T val = h->value; + // Only read nextItem when nextItemReady is true; the acquire on nextItemReady + // establishes happens-before for the producer's nextItem.store(release), + // so a relaxed load suffices. If nextItemReady is false (last item in queue), + // skip the read entirely — next = nullptr — to avoid racing with a concurrent + // producer that may be writing nextItem in the else branch of produce(). + LockFreeListItem* next = nullptr; + if(h->nextItemReady.load(std::memory_order_acquire)) + next = h->nextItem.load(std::memory_order_relaxed); // Advance head; release so next canBeConsumed() acquire sees updated state. - head.store(h->nextItem, std::memory_order_release); + head.store(next, std::memory_order_release); unsigned long _c = consumed.fetch_add(1, std::memory_order_relaxed) + 1; if((_c & 0xFFF) == 0) recycle(); @@ -152,7 +158,8 @@ class SingleProducerSingleConsumerList { size_t size = 0x01<<12; if(blocksNum <= blk) { LockFreeListItem* buffer = new LockFreeListItem[size]; - memset(buffer, 0, sizeof(LockFreeListItem) * size); + // No memset: constructors zero-initialise value, nextItem, nextItemReady. + // memset on std::atomic objects is UB. blocks[blocksNum & blocksRingBufferSizeMask] = buffer; blocksNum++; } @@ -172,7 +179,7 @@ class SingleProducerSingleConsumerList { private: std::atomic*> head; - LockFreeListItem* tail; // tail is producer-private, no atomic needed + std::atomic*> tail; // read by consumer in canBeConsumed() LockFreeListItem** blocks; std::atomic_bool producerFinished; std::atomic_bool consumerFinished;