Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion c++/include/orc/MemoryPool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ namespace orc {
uint64_t currentSize_;
// maximal capacity (actual allocated memory)
uint64_t currentCapacity_;
// whether this buffer owns memory lifecycle
bool ownBuffer_;

// not implemented
DataBuffer(DataBuffer& buffer);
DataBuffer& operator=(DataBuffer& buffer);

public:
DataBuffer(MemoryPool& pool, uint64_t size = 0);
DataBuffer(MemoryPool& pool, uint64_t size = 0, bool ownBuf = true);

Comment on lines 52 to 54
DataBuffer(DataBuffer<T>&& buffer) noexcept;

Expand Down Expand Up @@ -81,6 +83,9 @@ namespace orc {
void reserve(uint64_t size);
void resize(uint64_t size);
void zeroOut();

// Attach to an external raw buffer. bufSize is in bytes.
void setData(T* buf, size_t bufSize);
};

// Specializations for char
Expand Down
92 changes: 77 additions & 15 deletions c++/src/MemoryPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string.h>
#include <cstdlib>
#include <iostream>
#include <type_traits>

namespace orc {

Expand Down Expand Up @@ -52,35 +53,51 @@ namespace orc {
}

template <class T>
DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize)
: memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0) {
reserve(newSize);
currentSize_ = newSize;
DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize, bool ownBuf)
: memoryPool_(pool),
buf_(nullptr),
currentSize_(0),
currentCapacity_(0),
ownBuffer_(ownBuf) {
if (ownBuffer_) {
reserve(newSize);
currentSize_ = newSize;
}
}

template <class T>
DataBuffer<T>::DataBuffer(DataBuffer<T>&& buffer) noexcept
: memoryPool_(buffer.memoryPool_),
buf_(buffer.buf_),
currentSize_(buffer.currentSize_),
currentCapacity_(buffer.currentCapacity_) {
currentCapacity_(buffer.currentCapacity_),
ownBuffer_(buffer.ownBuffer_) {
buffer.buf_ = nullptr;
buffer.currentSize_ = 0;
buffer.currentCapacity_ = 0;
buffer.ownBuffer_ = true;
}

template <class T>
DataBuffer<T>::~DataBuffer() {
if (!ownBuffer_) {
return;
}
for (uint64_t i = currentSize_; i > 0; --i) {
(buf_ + i - 1)->~T();
}
if (buf_) {
static_assert(std::is_trivially_copyable<T>::value,
"Only trivially copyable type is supported for DataBuffer reserve");
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
Comment on lines 82 to 93
}

template <class T>
void DataBuffer<T>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (currentSize_ > newSize) {
for (uint64_t i = currentSize_; i > newSize; --i) {
Expand All @@ -96,6 +113,9 @@ namespace orc {

template <class T>
void DataBuffer<T>::reserve(uint64_t newCapacity) {
if (!ownBuffer_) {
return;
}
if (newCapacity > currentCapacity_ || !buf_) {
if (buf_) {
T* buf_old = buf_;
Expand All @@ -114,6 +134,18 @@ namespace orc {
memset(buf_, 0, sizeof(T) * currentCapacity_);
}

template <class T>
void DataBuffer<T>::setData(T* buffer, size_t bufSize) {
if (ownBuffer_ && buf_) {
static_assert(std::is_trivially_copyable<T>::value,
"Only trivially copyable type is supported for DataBuffer reserve");
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
ownBuffer_ = false;
buf_ = buffer;
currentSize_ = currentCapacity_ = static_cast<uint64_t>(bufSize / sizeof(T));
}
Comment on lines +138 to +147

// Specializations for Int128
template <>
void DataBuffer<Int128>::zeroOut() {
Expand All @@ -126,13 +158,16 @@ namespace orc {

template <>
DataBuffer<char>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<char>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
Expand All @@ -144,13 +179,16 @@ namespace orc {

template <>
DataBuffer<char*>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<char*>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*));
Expand All @@ -162,13 +200,16 @@ namespace orc {

template <>
DataBuffer<double>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<double>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double));
Expand All @@ -180,13 +221,16 @@ namespace orc {

template <>
DataBuffer<float>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<float>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float));
Expand All @@ -198,13 +242,16 @@ namespace orc {

template <>
DataBuffer<int64_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int64_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t));
Expand All @@ -216,13 +263,16 @@ namespace orc {

template <>
DataBuffer<int32_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int32_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t));
Expand All @@ -234,13 +284,16 @@ namespace orc {

template <>
DataBuffer<int16_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int16_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t));
Expand All @@ -252,13 +305,16 @@ namespace orc {

template <>
DataBuffer<int8_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int8_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t));
Expand All @@ -270,13 +326,16 @@ namespace orc {

template <>
DataBuffer<uint64_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<uint64_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t));
Expand All @@ -288,13 +347,16 @@ namespace orc {

template <>
DataBuffer<unsigned char>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<unsigned char>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
Expand Down
36 changes: 36 additions & 0 deletions c++/test/TestCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@

namespace orc {

class CountingMemoryPool : public MemoryPool {
public:
uint64_t allocCount = 0;
uint64_t freeCount = 0;

char* malloc(uint64_t size) override {
++allocCount;
return static_cast<char*>(std::malloc(size));
}

void free(char* p) override {
++freeCount;
std::free(p);
}
Comment on lines +34 to +42
};

TEST(TestReadRangeCombiner, testBasics) {
ReadRangeCombiner combinator{0, 100};
/// Ranges with partial overlap and identical offsets
Expand Down Expand Up @@ -139,4 +155,24 @@ namespace orc {
slice = cache.read({20, 2});
assert_slice_equal(slice, "uv");
}

TEST(TestDataBuffer, testExternalBufferNonOwning) {
CountingMemoryPool pool;
char external[16] = {0};
uint64_t freeCountAfterSetData = 0;

{
DataBuffer<char> buffer(pool, 0);
buffer.setData(external, sizeof(external));
freeCountAfterSetData = pool.freeCount;

// Non-owning buffer should keep external size and ignore resize.
EXPECT_EQ(sizeof(external), buffer.size());
buffer.resize(8);
EXPECT_EQ(sizeof(external), buffer.size());
}

// setData may release previously owned internal memory, but destruction should not free external.
EXPECT_EQ(freeCountAfterSetData, pool.freeCount);
}
} // namespace orc