From a3f8d81825de2c9509586e3e821a8e4925247c42 Mon Sep 17 00:00:00 2001 From: wokron Date: Mon, 8 Jun 2026 18:13:35 +0800 Subject: [PATCH 1/5] support external buffer memory in ProvidedBufferPool --- include/condy/provided_buffers.hpp | 88 ++++++++++++++++++++---------- tests/test_buffers.cpp | 64 ++++++++++++++++++++++ 2 files changed, 122 insertions(+), 30 deletions(-) diff --git a/include/condy/provided_buffers.hpp b/include/condy/provided_buffers.hpp index 00e037b..9622417 100644 --- a/include/condy/provided_buffers.hpp +++ b/include/condy/provided_buffers.hpp @@ -249,26 +249,26 @@ namespace detail { class BundledProvidedBufferPool { protected: - BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, - unsigned int flags) - : num_buffers_(std::bit_ceil(num_buffers)), + BundledProvidedBufferPool(void *buffer_data, uint32_t num_buffers, + size_t buffer_size, unsigned int flags) + : buffers_base_(static_cast(buffer_data)), + num_buffers_(std::bit_ceil(num_buffers)), mask_(io_uring_buf_ring_mask(num_buffers_)), buffer_size_(buffer_size), curr_buf_len_(buffer_size), br_flags_(flags) { auto &context = detail::Context::current(); - size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size); - void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE, - MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); - if (data == MAP_FAILED) [[unlikely]] { - throw detail::make_system_error("mmap"); - } - auto d1 = detail::defer([&]() { munmap(data, data_size); }); - bgid_ = context.next_bgid(); auto d2 = detail::defer([&]() { context.recycle_bgid(bgid_); }); - br_ = reinterpret_cast(data); + size_t ring_size = num_buffers_ * sizeof(io_uring_buf); + void *ring_data = mmap(nullptr, ring_size, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + if (ring_data == MAP_FAILED) [[unlikely]] { + throw detail::make_system_error("mmap"); + } + auto d1 = detail::defer([&]() { munmap(ring_data, ring_size); }); + br_ = reinterpret_cast(ring_data); io_uring_buf_ring_init(br_); io_uring_buf_reg reg = {}; @@ -281,11 +281,9 @@ class BundledProvidedBufferPool { throw detail::make_system_error("io_uring_register_buf_ring", -r); } - char *buffer_base = - static_cast(data) + sizeof(io_uring_buf) * num_buffers_; for (size_t bid = 0; bid < num_buffers_; bid++) { - char *ptr = buffer_base + bid * buffer_size; - io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask_, + char *ptr = buffers_base_ + bid * buffer_size_; + io_uring_buf_ring_add(br_, ptr, buffer_size_, bid, mask_, static_cast(bid)); } io_uring_buf_ring_advance(br_, static_cast(num_buffers_)); @@ -297,14 +295,15 @@ class BundledProvidedBufferPool { ~BundledProvidedBufferPool() { assert(br_ != nullptr); auto &context = detail::Context::current(); - size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_); - munmap(br_, data_size); [[maybe_unused]] int r = io_uring_unregister_buf_ring( context.runtime()->ring().ring(), bgid_); assert(r == 0); if (r == 0) { context.recycle_bgid(bgid_); } + + size_t ring_size = num_buffers_ * sizeof(io_uring_buf); + munmap(br_, ring_size); } public: @@ -382,24 +381,18 @@ class BundledProvidedBufferPool { void add_buffer_back(void *ptr, [[maybe_unused]] size_t size) noexcept { assert(size <= buffer_size_); - char *base = get_buffers_base_(); - assert(ptr >= base); - size_t offset = static_cast(ptr) - base; + assert(ptr >= buffers_base_); + size_t offset = static_cast(ptr) - buffers_base_; size_t bid = offset / buffer_size_; assert(bid < num_buffers_); - char *buffer_ptr = base + bid * buffer_size_; + char *buffer_ptr = buffers_base_ + bid * buffer_size_; io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask_, 0); io_uring_buf_ring_advance(br_, 1); } private: char *get_buffer_(uint16_t bid) const noexcept { - return get_buffers_base_() + static_cast(bid) * buffer_size_; - } - - char *get_buffers_base_() const noexcept { - return reinterpret_cast(br_) + - sizeof(io_uring_buf) * num_buffers_; + return buffers_base_ + static_cast(bid) * buffer_size_; } io_uring_buf *curr_io_uring_buf_() noexcept { @@ -408,8 +401,9 @@ class BundledProvidedBufferPool { void advance_io_uring_buf_() noexcept { br_head_++; } -private: +protected: io_uring_buf_ring *br_ = nullptr; + char *buffers_base_ = nullptr; uint32_t num_buffers_; int mask_; uint32_t buffer_size_; @@ -444,7 +438,29 @@ class ProvidedBufferPool : public detail::BundledProvidedBufferPool { */ ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags = 0) - : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {} + : BundledProvidedBufferPool( + alloc_buffer_data_(std::bit_ceil(num_buffers) * buffer_size), + num_buffers, buffer_size, flags), + external_memory_(false) {} + + /** + * @brief Construct with externally provided buffer memory. + * @param buffer_data Pointer to externally allocated buffer memory. + * @param num_buffers Number of buffers in the pool. + * @param buffer_size Size of each buffer in bytes. + * @param flags Optional flags for io_uring buffer registration. + */ + ProvidedBufferPool(void *buffer_data, uint32_t num_buffers, + size_t buffer_size, unsigned int flags = 0) + : BundledProvidedBufferPool(buffer_data, num_buffers, buffer_size, + flags), + external_memory_(true) {} + + ~ProvidedBufferPool() { + if (!external_memory_) { + munmap(buffers_base_, capacity() * buffer_size()); + } + } public: ProvidedBuffer handle_finish(io_uring_cqe *cqe) noexcept { @@ -456,6 +472,18 @@ class ProvidedBufferPool : public detail::BundledProvidedBufferPool { assert(buffers.size() == 1); return std::move(buffers[0]); } + +private: + static void *alloc_buffer_data_(size_t buf_size) { + void *buf_data = mmap(nullptr, buf_size, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + if (buf_data == MAP_FAILED) [[unlikely]] { + throw detail::make_system_error("mmap"); + } + return buf_data; + } + + bool external_memory_ = false; }; /** diff --git a/tests/test_buffers.cpp b/tests/test_buffers.cpp index 7e48e7a..e92947e 100644 --- a/tests/test_buffers.cpp +++ b/tests/test_buffers.cpp @@ -514,6 +514,70 @@ TEST_CASE("test buffers - provided buffer pool usage bundle incr") { } #endif +TEST_CASE("test buffers - provided buffer pool external memory") { + condy::Runtime runtime; + auto &ring = enable_runtime_ring(runtime); + auto d = condy::detail::defer( + []() { condy::detail::Context::current().reset(); }); + + condy::detail::Context::current().init(&runtime); + + constexpr size_t num_bufs = 4; + constexpr size_t buf_size = 32; + size_t total = num_bufs * buf_size; + + void *ext_mem = mmap(nullptr, total, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + REQUIRE(ext_mem != MAP_FAILED); + + { + condy::ProvidedBufferPool pool(ext_mem, num_bufs, buf_size); + REQUIRE(pool.capacity() == num_bufs); + REQUIRE(pool.buffer_size() == buf_size); + + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); + + ssize_t r = ::write(pipefd[1], "hello external buffer pool!", 26); + REQUIRE(r == 26); + + auto *sqe = ring.get_sqe(); + io_uring_prep_read(sqe, pipefd[0], nullptr, 0, 0); + io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT); + sqe->buf_group = pool.bgid(); + io_uring_sqe_set_data(sqe, nullptr); + + int read_result = -1; + condy::ProvidedBuffer ret; + + size_t reaped = 0; + while (reaped < 1) { + ring.submit(); + reaped += ring.reap_completions([&](io_uring_cqe *cqe) { + auto *data = io_uring_cqe_get_data(cqe); + REQUIRE(data == nullptr); + read_result = cqe->res; + ret = pool.handle_finish(cqe); + }); + } + + REQUIRE(read_result == 26); + REQUIRE(ret.data() != nullptr); + REQUIRE(ret.data() >= ext_mem); + REQUIRE(static_cast(ret.data()) < + static_cast(ext_mem) + total); + + ::close(pipefd[0]); + ::close(pipefd[1]); + } + + // Pool destroyed; external memory should still be valid + memset(ext_mem, 0xAB, total); + REQUIRE(static_cast(ext_mem)[0] == static_cast(0xAB)); + + munmap(ext_mem, total); +} + TEST_CASE("test buffers - provided buffer is also buffer") { condy::ProvidedBuffer buf; auto fixed_buf = condy::fixed(2, buf); From a0b227d7fc89396c4bc8eac84dc8e115c909aa91 Mon Sep 17 00:00:00 2001 From: wokron Date: Mon, 8 Jun 2026 17:03:49 +0800 Subject: [PATCH 2/5] move IdPool to utils --- include/condy/detail/context.hpp | 34 -------------------------------- include/condy/detail/utils.hpp | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/include/condy/detail/context.hpp b/include/condy/detail/context.hpp index b3b3e0c..05bf4d2 100644 --- a/include/condy/detail/context.hpp +++ b/include/condy/detail/context.hpp @@ -14,40 +14,6 @@ class Runtime; namespace detail { -template ::max()> -class IdPool { -public: - static_assert(From < To, "Invalid ID range"); - - T allocate() { - if (!recycled_ids_.empty()) { - T id = recycled_ids_.top(); - recycled_ids_.pop(); - return id; - } - if (next_id_ < To) { - return next_id_++; - } - throw std::runtime_error("ID pool exhausted"); - } - - void recycle(T id) noexcept { - assert(From <= id && id < next_id_ && id < To); - recycled_ids_.push(id); - } - - void reset() noexcept { - next_id_ = From; - while (!recycled_ids_.empty()) { - recycled_ids_.pop(); - } - } - -private: - T next_id_ = From; - std::stack recycled_ids_; -}; - class Context : public ThreadLocalSingleton { public: void init(Runtime *runtime) noexcept { diff --git a/include/condy/detail/utils.hpp b/include/condy/detail/utils.hpp index 2b87dda..3102ef2 100644 --- a/include/condy/detail/utils.hpp +++ b/include/condy/detail/utils.hpp @@ -99,6 +99,40 @@ template auto defer(Func &&func) { return Defer>(std::forward(func)); } +template ::max()> +class IdPool { +public: + static_assert(From < To, "Invalid ID range"); + + T allocate() { + if (!recycled_ids_.empty()) { + T id = recycled_ids_.top(); + recycled_ids_.pop(); + return id; + } + if (next_id_ < To) { + return next_id_++; + } + throw std::runtime_error("ID pool exhausted"); + } + + void recycle(T id) noexcept { + assert(From <= id && id < next_id_ && id < To); + recycled_ids_.push(id); + } + + void reset() noexcept { + next_id_ = From; + while (!recycled_ids_.empty()) { + recycled_ids_.pop(); + } + } + +private: + T next_id_ = From; + std::stack recycled_ids_; +}; + [[noreturn]] inline void panic_on(std::string_view msg) noexcept { std::cerr << std::format("Panic: {}\n", msg); #ifndef CRASH_TEST From 3d6be7de893fa491c8e0ace7e554dadb46648977 Mon Sep 17 00:00:00 2001 From: wokron Date: Mon, 8 Jun 2026 17:08:35 +0800 Subject: [PATCH 3/5] move bgid_pool to Runtime --- include/condy/detail/context.hpp | 17 ++----------- include/condy/provided_buffers.hpp | 38 ++++++++++++++++-------------- include/condy/runtime.hpp | 7 ++++++ 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/include/condy/detail/context.hpp b/include/condy/detail/context.hpp index 05bf4d2..6cc797c 100644 --- a/include/condy/detail/context.hpp +++ b/include/condy/detail/context.hpp @@ -5,8 +5,6 @@ #pragma once #include "condy/detail/singleton.hpp" -#include -#include namespace condy { @@ -16,24 +14,13 @@ namespace detail { class Context : public ThreadLocalSingleton { public: - void init(Runtime *runtime) noexcept { - runtime_ = runtime; - bgid_pool_.reset(); - } - void reset() noexcept { - runtime_ = nullptr; - bgid_pool_.reset(); - } + void init(Runtime *runtime) noexcept { runtime_ = runtime; } + void reset() noexcept { runtime_ = nullptr; } Runtime *runtime() noexcept { return runtime_; } - uint16_t next_bgid() { return bgid_pool_.allocate(); } - - void recycle_bgid(uint16_t bgid) noexcept { bgid_pool_.recycle(bgid); } - private: Runtime *runtime_ = nullptr; - IdPool bgid_pool_; }; } // namespace detail diff --git a/include/condy/provided_buffers.hpp b/include/condy/provided_buffers.hpp index 9622417..07426e2 100644 --- a/include/condy/provided_buffers.hpp +++ b/include/condy/provided_buffers.hpp @@ -53,7 +53,8 @@ class BundledProvidedBufferQueue { : capacity_(std::bit_ceil(capacity)), mask_(io_uring_buf_ring_mask(capacity_)), buf_lens_(capacity_, 0), br_flags_(flags) { - auto &context = detail::Context::current(); + runtime_ = detail::Context::current().runtime(); + auto &bgid_pool = runtime_->bgid_pool(); size_t data_size = capacity_ * sizeof(io_uring_buf); void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE, @@ -63,8 +64,8 @@ class BundledProvidedBufferQueue { } auto d1 = detail::defer([&]() { munmap(data, data_size); }); - bgid_ = context.next_bgid(); - auto d2 = detail::defer([&]() { context.recycle_bgid(bgid_); }); + bgid_ = bgid_pool.allocate(); + auto d2 = detail::defer([&]() { bgid_pool.recycle(bgid_); }); br_ = reinterpret_cast(data); io_uring_buf_ring_init(br_); @@ -73,8 +74,8 @@ class BundledProvidedBufferQueue { reg.ring_addr = reinterpret_cast(br_); reg.ring_entries = capacity_; reg.bgid = bgid_; - int r = io_uring_register_buf_ring(context.runtime()->ring().ring(), - ®, br_flags_); + int r = io_uring_register_buf_ring(runtime_->ring().ring(), ®, + br_flags_); if (r != 0) [[unlikely]] { throw detail::make_system_error("io_uring_register_buf_ring", -r); } @@ -85,14 +86,13 @@ class BundledProvidedBufferQueue { ~BundledProvidedBufferQueue() { assert(br_ != nullptr); - auto &context = detail::Context::current(); size_t data_size = capacity_ * sizeof(io_uring_buf); munmap(br_, data_size); - [[maybe_unused]] int r = io_uring_unregister_buf_ring( - context.runtime()->ring().ring(), bgid_); + [[maybe_unused]] int r = + io_uring_unregister_buf_ring(runtime_->ring().ring(), bgid_); assert(r == 0); if (r == 0) { - context.recycle_bgid(bgid_); + runtime_->bgid_pool().recycle(bgid_); } } @@ -190,6 +190,7 @@ class BundledProvidedBufferQueue { } private: + Runtime *runtime_; io_uring_buf_ring *br_ = nullptr; uint32_t size_ = 0; uint32_t capacity_; @@ -256,10 +257,11 @@ class BundledProvidedBufferPool { mask_(io_uring_buf_ring_mask(num_buffers_)), buffer_size_(buffer_size), curr_buf_len_(buffer_size), br_flags_(flags) { - auto &context = detail::Context::current(); + runtime_ = detail::Context::current().runtime(); + auto &bgid_pool = runtime_->bgid_pool(); - bgid_ = context.next_bgid(); - auto d2 = detail::defer([&]() { context.recycle_bgid(bgid_); }); + bgid_ = bgid_pool.allocate(); + auto d2 = detail::defer([&]() { bgid_pool.recycle(bgid_); }); size_t ring_size = num_buffers_ * sizeof(io_uring_buf); void *ring_data = mmap(nullptr, ring_size, PROT_READ | PROT_WRITE, @@ -275,8 +277,8 @@ class BundledProvidedBufferPool { reg.ring_addr = reinterpret_cast(br_); reg.ring_entries = num_buffers_; reg.bgid = bgid_; - int r = io_uring_register_buf_ring(context.runtime()->ring().ring(), - ®, br_flags_); + int r = io_uring_register_buf_ring(runtime_->ring().ring(), ®, + br_flags_); if (r != 0) [[unlikely]] { throw detail::make_system_error("io_uring_register_buf_ring", -r); } @@ -294,12 +296,11 @@ class BundledProvidedBufferPool { ~BundledProvidedBufferPool() { assert(br_ != nullptr); - auto &context = detail::Context::current(); - [[maybe_unused]] int r = io_uring_unregister_buf_ring( - context.runtime()->ring().ring(), bgid_); + [[maybe_unused]] int r = + io_uring_unregister_buf_ring(runtime_->ring().ring(), bgid_); assert(r == 0); if (r == 0) { - context.recycle_bgid(bgid_); + runtime_->bgid_pool().recycle(bgid_); } size_t ring_size = num_buffers_ * sizeof(io_uring_buf); @@ -402,6 +403,7 @@ class BundledProvidedBufferPool { void advance_io_uring_buf_() noexcept { br_head_++; } protected: + Runtime *runtime_; io_uring_buf_ring *br_ = nullptr; char *buffers_base_ = nullptr; uint32_t num_buffers_; diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 744042e..97b2783 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -210,6 +210,12 @@ class Runtime { */ auto &settings() noexcept { return settings_; } + /** + * @brief Get the buffer group ID pool. + * @return IdPool& Reference to the buffer group ID pool. + */ + auto &bgid_pool() noexcept { return bgid_pool_; } + private: static detail::Ring create_ring_(const RuntimeOptions &options) { io_uring_params params; @@ -446,6 +452,7 @@ class Runtime { FdTable fd_table_; BufferTable buffer_table_; RingSettings settings_; + detail::IdPool bgid_pool_; }; /** From 77088705d035493ea7b56b94eb3bba83c3328853 Mon Sep 17 00:00:00 2001 From: wokron Date: Mon, 8 Jun 2026 20:03:51 +0800 Subject: [PATCH 4/5] provided buffer support specific runtime --- include/condy/provided_buffers.hpp | 63 +++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/include/condy/provided_buffers.hpp b/include/condy/provided_buffers.hpp index 07426e2..471d4cd 100644 --- a/include/condy/provided_buffers.hpp +++ b/include/condy/provided_buffers.hpp @@ -49,11 +49,11 @@ namespace detail { class BundledProvidedBufferQueue { protected: - BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags) - : capacity_(std::bit_ceil(capacity)), + BundledProvidedBufferQueue(Runtime *runtime, uint32_t capacity, + unsigned int flags) + : runtime_(runtime), capacity_(std::bit_ceil(capacity)), mask_(io_uring_buf_ring_mask(capacity_)), buf_lens_(capacity_, 0), br_flags_(flags) { - runtime_ = detail::Context::current().runtime(); auto &bgid_pool = runtime_->bgid_pool(); size_t data_size = capacity_ * sizeof(io_uring_buf); @@ -222,7 +222,20 @@ class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue { * (default: 0). */ ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0) - : BundledProvidedBufferQueue(capacity, flags) {} + : ProvidedBufferQueue(*detail::Context::current().runtime(), capacity, + flags) {} + + /** + * @brief Construct a new Provided Buffer Queue object with specified + * Runtime. + * @param runtime The Runtime to associate with this buffer queue. + * @param capacity Number of buffers the queue can hold. + * @param flags Optional flags for io_uring buffer ring registration + * (default: 0). + */ + ProvidedBufferQueue(Runtime &runtime, uint32_t capacity, + unsigned int flags = 0) + : BundledProvidedBufferQueue(&runtime, capacity, flags) {} BufferInfo handle_finish(io_uring_cqe *cqe) noexcept { assert(cqe != nullptr); @@ -250,14 +263,14 @@ namespace detail { class BundledProvidedBufferPool { protected: - BundledProvidedBufferPool(void *buffer_data, uint32_t num_buffers, - size_t buffer_size, unsigned int flags) - : buffers_base_(static_cast(buffer_data)), + BundledProvidedBufferPool(Runtime *runtime, void *buffer_data, + uint32_t num_buffers, size_t buffer_size, + unsigned int flags) + : runtime_(runtime), buffers_base_(static_cast(buffer_data)), num_buffers_(std::bit_ceil(num_buffers)), mask_(io_uring_buf_ring_mask(num_buffers_)), buffer_size_(buffer_size), curr_buf_len_(buffer_size), br_flags_(flags) { - runtime_ = detail::Context::current().runtime(); auto &bgid_pool = runtime_->bgid_pool(); bgid_ = bgid_pool.allocate(); @@ -440,7 +453,22 @@ class ProvidedBufferPool : public detail::BundledProvidedBufferPool { */ ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags = 0) + : ProvidedBufferPool(*detail::Context::current().runtime(), num_buffers, + buffer_size, flags) {} + + /** + * @brief Construct a new Provided Buffer Pool object with specified + * Runtime. + * @param runtime The Runtime to associate with this buffer pool. + * @param num_buffers Number of buffers to allocate in the pool. + * @param buffer_size Size of each buffer in bytes. + * @param flags Optional flags for io_uring buffer registration (default: + * 0). + */ + ProvidedBufferPool(Runtime &runtime, uint32_t num_buffers, + size_t buffer_size, unsigned int flags = 0) : BundledProvidedBufferPool( + &runtime, alloc_buffer_data_(std::bit_ceil(num_buffers) * buffer_size), num_buffers, buffer_size, flags), external_memory_(false) {} @@ -454,8 +482,23 @@ class ProvidedBufferPool : public detail::BundledProvidedBufferPool { */ ProvidedBufferPool(void *buffer_data, uint32_t num_buffers, size_t buffer_size, unsigned int flags = 0) - : BundledProvidedBufferPool(buffer_data, num_buffers, buffer_size, - flags), + : ProvidedBufferPool(*detail::Context::current().runtime(), buffer_data, + num_buffers, buffer_size, flags) {} + + /** + * @brief Construct with externally provided buffer memory and specified + * Runtime. + * @param runtime The Runtime to associate with this buffer pool. + * @param buffer_data Pointer to externally allocated buffer memory. + * @param num_buffers Number of buffers in the pool. + * @param buffer_size Size of each buffer in bytes. + * @param flags Optional flags for io_uring buffer registration. + */ + ProvidedBufferPool(Runtime &runtime, void *buffer_data, + uint32_t num_buffers, size_t buffer_size, + unsigned int flags = 0) + : BundledProvidedBufferPool(&runtime, buffer_data, num_buffers, + buffer_size, flags), external_memory_(true) {} ~ProvidedBufferPool() { From 4b88488c81ce3f32fd572eb1d86b6658e9275a2e Mon Sep 17 00:00:00 2001 From: wokron Date: Mon, 8 Jun 2026 20:09:18 +0800 Subject: [PATCH 5/5] add tests --- tests/test_buffers.cpp | 153 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/tests/test_buffers.cpp b/tests/test_buffers.cpp index e92947e..e9f1d32 100644 --- a/tests/test_buffers.cpp +++ b/tests/test_buffers.cpp @@ -588,4 +588,157 @@ TEST_CASE("test buffers - provided buffer is also buffer") { // also ok [[maybe_unused]] auto aw2 = condy::async_read(0, buf, 0); +} + +TEST_CASE("test buffers - provided buffer queue before ring enabled") { + condy::Runtime runtime; + + // Creating a provided buffer queue should succeed even before the ring is + // enabled, since io_uring_register_buf_ring does not require an enabled + // ring. + condy::ProvidedBufferQueue queue(runtime, 4); + REQUIRE(queue.capacity() == (1 << 2)); + + char buf1[32], buf2[32]; + REQUIRE(queue.push(condy::buffer(buf1)) == 0); + REQUIRE(queue.size() == 1); + REQUIRE(queue.push(condy::buffer(buf2)) == 1); + REQUIRE(queue.size() == 2); + + // Now enable the ring and verify I/O still works. + auto &ring = enable_runtime_ring(runtime); + + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); + ssize_t r = ::write(pipefd[1], "data", 4); + REQUIRE(r == 4); + + auto *sqe = ring.get_sqe(); + io_uring_prep_read(sqe, pipefd[0], nullptr, 0, 0); + io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT); + sqe->buf_group = queue.bgid(); + io_uring_sqe_set_data(sqe, nullptr); + + condy::BufferInfo ret; + size_t reaped = 0; + while (reaped < 1) { + ring.submit(); + reaped += ring.reap_completions([&](io_uring_cqe *cqe) { + REQUIRE(io_uring_cqe_get_data(cqe) == nullptr); + r = cqe->res; + ret = queue.handle_finish(cqe); + }); + } + + REQUIRE(r > 0); + REQUIRE(ret.num_buffers == 1); + REQUIRE(ret.bid == 0); + REQUIRE(queue.size() == 1); + REQUIRE(std::memcmp(buf1, "data", 4) == 0); + + ::close(pipefd[0]); + ::close(pipefd[1]); +} + +TEST_CASE("test buffers - provided buffer pool before ring enabled") { + condy::Runtime runtime; + + // Creating a provided buffer pool should succeed even before the ring is + // enabled. + condy::ProvidedBufferPool pool(runtime, 4, 16); + REQUIRE(pool.capacity() == (1 << 2)); + + // Now enable the ring and verify I/O still works. + auto &ring = enable_runtime_ring(runtime); + + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); + ssize_t r = ::write(pipefd[1], "data", 4); + REQUIRE(r == 4); + + auto *sqe = ring.get_sqe(); + io_uring_prep_read(sqe, pipefd[0], nullptr, 0, 0); + io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT); + sqe->buf_group = pool.bgid(); + io_uring_sqe_set_data(sqe, nullptr); + + condy::ProvidedBuffer ret; + size_t reaped = 0; + while (reaped < 1) { + ring.submit(); + reaped += ring.reap_completions([&](io_uring_cqe *cqe) { + REQUIRE(io_uring_cqe_get_data(cqe) == nullptr); + r = cqe->res; + ret = pool.handle_finish(cqe); + }); + } + + REQUIRE(r == 4); + REQUIRE(ret.owns_buffer()); + REQUIRE(ret.size() == 16); + REQUIRE(std::memcmp(ret.data(), "data", 4) == 0); + + ::close(pipefd[0]); + ::close(pipefd[1]); +} + +TEST_CASE("test buffers - provided buffer pool external memory explicit " + "runtime") { + condy::Runtime runtime; + auto &ring = enable_runtime_ring(runtime); + + constexpr size_t num_bufs = 4; + constexpr size_t buf_size = 32; + size_t total = num_bufs * buf_size; + + void *ext_mem = mmap(nullptr, total, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + REQUIRE(ext_mem != MAP_FAILED); + + { + condy::ProvidedBufferPool pool(runtime, ext_mem, num_bufs, buf_size); + REQUIRE(pool.capacity() == num_bufs); + REQUIRE(pool.buffer_size() == buf_size); + + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); + + ssize_t r = ::write(pipefd[1], "hello explicit runtime pool!", 27); + REQUIRE(r == 27); + + auto *sqe = ring.get_sqe(); + io_uring_prep_read(sqe, pipefd[0], nullptr, 0, 0); + io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT); + sqe->buf_group = pool.bgid(); + io_uring_sqe_set_data(sqe, nullptr); + + int read_result = -1; + condy::ProvidedBuffer ret; + + size_t reaped = 0; + while (reaped < 1) { + ring.submit(); + reaped += ring.reap_completions([&](io_uring_cqe *cqe) { + auto *data = io_uring_cqe_get_data(cqe); + REQUIRE(data == nullptr); + read_result = cqe->res; + ret = pool.handle_finish(cqe); + }); + } + + REQUIRE(read_result == 27); + REQUIRE(ret.data() != nullptr); + REQUIRE(ret.data() >= ext_mem); + REQUIRE(static_cast(ret.data()) < + static_cast(ext_mem) + total); + + ::close(pipefd[0]); + ::close(pipefd[1]); + } + + // Pool destroyed; external memory should still be valid + memset(ext_mem, 0xAB, total); + REQUIRE(static_cast(ext_mem)[0] == static_cast(0xAB)); + + munmap(ext_mem, total); } \ No newline at end of file