Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ memmap2 = "0.9.5"
interval-heap = "0.0.5"
log = "0.4.28"
lz4_flex = "0.12"
libc = "0.2.177"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
fs2 = "0.4.3"
Expand Down
28 changes: 0 additions & 28 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ use crate::Value;

/// Kind constants for differentiating cache entry types
const KIND_DATA: u8 = 0;
const KIND_INDEX: u8 = 1;
const KIND_VLOG: u8 = 2;

#[derive(Clone)]
pub(crate) enum Item {
Data(Arc<Block>),
Index(Arc<Block>),
VLog(Value),
}

Expand Down Expand Up @@ -54,7 +52,6 @@ impl Weighter<CacheKey, Item> for BlockWeighter {
fn weight(&self, _: &CacheKey, item: &Item) -> u64 {
match item {
Item::Data(block) => block.size() as u64,
Item::Index(block) => block.size() as u64,
Item::VLog(value) => value.len() as u64,
}
}
Expand Down Expand Up @@ -101,11 +98,6 @@ impl BlockCache {
self.data.insert((KIND_DATA, table_id, offset).into(), Item::Data(block));
}

/// Inserts an index block into the cache.
pub(crate) fn insert_index_block(&self, table_id: u64, offset: u64, block: Arc<Block>) {
self.data.insert((KIND_INDEX, table_id, offset).into(), Item::Index(block));
}

/// Inserts a VLog value into the cache.
pub(crate) fn insert_vlog(&self, file_id: u32, offset: u64, value: Value) {
self.data.insert((KIND_VLOG, file_id as u64, offset).into(), Item::VLog(value));
Expand All @@ -131,26 +123,6 @@ impl BlockCache {
}
}

/// Retrieves an index block from the cache.
pub(crate) fn get_index_block(&self, table_id: u64, offset: u64) -> Option<Arc<Block>> {
let key = (KIND_INDEX, table_id, &offset);
let item = self.data.get(&key);

#[cfg(test)]
{
if item.is_some() {
self.index_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
self.index_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}

match item.as_ref()? {
Item::Index(block) => Some(Arc::clone(block)),
_ => None,
}
}

/// Retrieves a VLog value from the cache.
pub(crate) fn get_vlog(&self, file_id: u32, offset: u64) -> Option<Value> {
let key = (KIND_VLOG, file_id as u64, &offset);
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum Error {
message: String,
},
SSTable(crate::sstable::error::SSTableError), // SSTable-specific errors
PartitionBlockExpectedButNotFound,
}

// Implementation of Display trait for Error
Expand Down Expand Up @@ -106,6 +107,7 @@ impl fmt::Display for Error {
segment_id, offset, message
),
Self::SSTable(err) => write!(f, "SSTable error: {err}"),
Self::PartitionBlockExpectedButNotFound => write!(f, "Partition block expected but not found"),
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ pub struct Options {
/// Controls behavior when WAL corruption is detected during recovery.
/// Default: TolerateCorruptedWithRepair (attempt repair and continue)
pub wal_recovery_mode: WalRecoveryMode,

pub max_auto_readahead_size: usize,
pub initial_auto_readahead_size: usize,
}

impl Default for Options {
Expand Down Expand Up @@ -233,6 +236,8 @@ impl Default for Options {
clock,
flush_on_close: true,
wal_recovery_mode: WalRecoveryMode::default(),
max_auto_readahead_size: 256 * 1024, // 256KB
initial_auto_readahead_size: 8 * 1024, // 8KB
}
}
}
Expand Down Expand Up @@ -535,6 +540,16 @@ impl Options {

Ok(())
}

pub const fn with_max_auto_readahead_size(mut self, size: usize) -> Self {
self.max_auto_readahead_size = size;
self
}

pub const fn with_initial_auto_readahead_size(mut self, size: usize) -> Self {
self.initial_auto_readahead_size = size;
self
}
}

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
Expand Down
149 changes: 128 additions & 21 deletions src/sstable/index_block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Note: Needs to be tested if a top-level index improves performance as such.
// TODO: Replace the current non-partitioned index block writer with this
use std::cmp::Ordering;
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;

Expand Down Expand Up @@ -183,9 +184,7 @@ pub(crate) struct TopLevelIndex {
pub(crate) id: u64,
pub(crate) opts: Arc<Options>,
pub(crate) blocks: Vec<BlockHandleWithKey>,
// TODO: Fix this, as this could be problematic if the file is being shared across without any
// mutex
pub(crate) file: Arc<dyn File>,
pub(crate) partition_map: HashMap<u64, Arc<Block>>,
}

impl TopLevelIndex {
Expand All @@ -199,20 +198,29 @@ impl TopLevelIndex {
read_table_block(Arc::clone(&opt.internal_comparator), Arc::clone(&f), location)?;
let iter = block.iter(false)?;
let mut blocks = Vec::new();
let mut partition_map = HashMap::new();
for item in iter {
let (key, handle) = item?;
// Store full encoded internal key for correct partition lookup
let (handle, _) = BlockHandle::decode(&handle)?;
blocks.push(BlockHandleWithKey {
let block_handle = BlockHandleWithKey {
separator_key: key,
handle,
});
};
let block = Arc::new(read_table_block(
Arc::clone(&opt.internal_comparator),
Arc::clone(&f),
&block_handle.handle,
)?);

partition_map.insert(block_handle.offset(), block);
blocks.push(block_handle);
}
Ok(TopLevelIndex {
id,
opts: opt,
blocks,
file: Arc::clone(&f),
partition_map,
})
}

Expand Down Expand Up @@ -246,23 +254,11 @@ impl TopLevelIndex {
}

pub(crate) fn load_block(&self, block_handle: &BlockHandleWithKey) -> Result<Arc<Block>> {
if let Some(block) = self.opts.block_cache.get_index_block(self.id, block_handle.offset()) {
return Ok(block);
if let Some(block) = self.partition_map.get(&block_handle.offset()) {
return Ok(Arc::clone(block));
}

let block_data = read_table_block(
Arc::clone(&self.opts.internal_comparator),
Arc::clone(&self.file),
&block_handle.handle,
)?;
let block = Arc::new(block_data);
self.opts.block_cache.insert_index_block(
self.id,
block_handle.offset(),
Arc::clone(&block),
);

Ok(block)
Err(Error::PartitionBlockExpectedButNotFound)
}

pub(crate) fn get(&self, target: &[u8]) -> Result<Arc<Block>> {
Expand All @@ -275,3 +271,114 @@ impl TopLevelIndex {
}
}
}

/// Handles readahead for sequential block access patterns.
pub(crate) struct BlockPrefetcher {
// Current readahead size for FS prefetching
pub(crate) readahead_size: usize,
// Readahead limit for tracking what has been prefetched
readahead_limit: u64,
// Initial auto readahead size for internal prefetch buffer
initial_auto_readahead_size: usize,
// Maximum auto readahead size to cap exponential growth
max_auto_readahead_size: usize,
// Number of sequential file reads for auto readahead
num_sequential_reads: usize,
// Previous access pattern for sequential detection
prev_offset: u64,
prev_len: usize,
}

impl BlockPrefetcher {
pub(crate) fn new(initial_auto_readahead_size: usize, max_auto_readahead_size: usize) -> Self {
let sanitized_initial = if initial_auto_readahead_size > max_auto_readahead_size {
max_auto_readahead_size
} else {
initial_auto_readahead_size
};

Self {
readahead_size: sanitized_initial,
readahead_limit: 0,
initial_auto_readahead_size: sanitized_initial,
max_auto_readahead_size,
num_sequential_reads: 0,
prev_offset: 0,
prev_len: 0,
}
}

/// Update read pattern for sequential detection
pub(crate) fn update_read_pattern(&mut self, offset: u64, len: usize) {
self.prev_offset = offset;
self.prev_len = len;
}

/// Check if block access is sequential
pub(crate) fn is_block_sequential(&self, offset: u64) -> bool {
self.prev_len == 0 || (self.prev_offset + self.prev_len as u64 == offset)
}

pub(crate) fn reset_values(&mut self, initial_auto_readahead_size: usize) {
self.num_sequential_reads = 1;
// Sanitize the initial size against max_auto_readahead_size
let sanitized_initial = initial_auto_readahead_size.min(self.max_auto_readahead_size);
self.initial_auto_readahead_size = sanitized_initial;
self.readahead_size = sanitized_initial;
self.readahead_limit = 0;
}

pub(crate) fn prefetch_if_needed(
&mut self,
handle: &BlockHandle,
file: &dyn crate::vfs::File,
) -> bool {
let len = handle.size() as u64;
let offset = handle.offset() as u64;

// Check if already prefetched
if offset + len <= self.readahead_limit {
self.update_read_pattern(offset, len as usize);
return false;
}

// Check if access is sequential
if !self.is_block_sequential(offset) {
self.update_read_pattern(offset, len as usize);
self.reset_values(self.initial_auto_readahead_size);
return false;
}

self.update_read_pattern(offset, len as usize);

// Auto readahead logic - try FS prefetch
// Disable prefetching if either initial or max readahead size is 0
if self.initial_auto_readahead_size == 0 || self.max_auto_readahead_size == 0 {
return false;
}

self.num_sequential_reads += 1;
if self.num_sequential_reads <= 2 {
// Default num_sequential_reads_for_auto_readahead
return false;
}

// Try FS-level prefetch for auto readahead
if file.supports_prefetch() {
let prefetch_result = file.prefetch(offset, len as usize + self.readahead_size);
if prefetch_result.is_ok() {
self.readahead_limit = offset + len + self.readahead_size as u64;
self.grow_readahead_size();
return false; // FS prefetch succeeded
}
}

true
}

pub(crate) fn grow_readahead_size(&mut self) {
if self.readahead_size < self.max_auto_readahead_size {
self.readahead_size = (self.readahead_size * 2).min(self.max_auto_readahead_size);
}
}
}
10 changes: 9 additions & 1 deletion src/sstable/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::error::{Error, Result};
use crate::sstable::block::{Block, BlockData, BlockHandle, BlockIterator, BlockWriter};
use crate::sstable::error::SSTableError;
use crate::sstable::filter_block::{FilterBlockReader, FilterBlockWriter};
use crate::sstable::index_block::{TopLevelIndex, TopLevelIndexWriter};
use crate::sstable::index_block::{BlockPrefetcher, TopLevelIndex, TopLevelIndexWriter};
use crate::sstable::meta::TableMetadata;
use crate::sstable::{
InternalKey,
Expand Down Expand Up @@ -843,6 +843,10 @@ impl Table {
keys_only,
range,
reverse_started: false,
block_prefetcher: BlockPrefetcher::new(
self.opts.initial_auto_readahead_size,
self.opts.max_auto_readahead_size,
),
}
}

Expand Down Expand Up @@ -913,6 +917,8 @@ pub(crate) struct TableIterator {
/// Whether reverse iteration has started (to distinguish from just
/// positioned)
reverse_started: bool,
// Block prefetcher for readahead during iteration
block_prefetcher: BlockPrefetcher,
}

impl TableIterator {
Expand Down Expand Up @@ -975,6 +981,8 @@ impl TableIterator {
}

fn load_block(&mut self, handle: &BlockHandle) -> Result<()> {
self.block_prefetcher.prefetch_if_needed(handle, self.table.file.as_ref());

let block = self.table.read_block(handle)?;
let mut block_iter = block.iter(self.keys_only)?;

Expand Down
Loading
Loading