Skip to content

Commit e012cf7

Browse files
committed
prefetch paritions
1 parent 11965e4 commit e012cf7

9 files changed

Lines changed: 369 additions & 74 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ memmap2 = "0.9.5"
4141
interval-heap = "0.0.5"
4242
log = "0.4.28"
4343
lz4_flex = "0.12"
44+
libc = "0.2.177"
4445

4546
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
4647
fs2 = "0.4.3"

src/cache.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ use crate::Value;
1010

1111
/// Kind constants for differentiating cache entry types
1212
const KIND_DATA: u8 = 0;
13-
const KIND_INDEX: u8 = 1;
1413
const KIND_VLOG: u8 = 2;
1514

1615
#[derive(Clone)]
1716
pub(crate) enum Item {
1817
Data(Arc<Block>),
19-
Index(Arc<Block>),
2018
VLog(Value),
2119
}
2220

@@ -54,7 +52,6 @@ impl Weighter<CacheKey, Item> for BlockWeighter {
5452
fn weight(&self, _: &CacheKey, item: &Item) -> u64 {
5553
match item {
5654
Item::Data(block) => block.size() as u64,
57-
Item::Index(block) => block.size() as u64,
5855
Item::VLog(value) => value.len() as u64,
5956
}
6057
}
@@ -101,11 +98,6 @@ impl BlockCache {
10198
self.data.insert((KIND_DATA, table_id, offset).into(), Item::Data(block));
10299
}
103100

104-
/// Inserts an index block into the cache.
105-
pub(crate) fn insert_index_block(&self, table_id: u64, offset: u64, block: Arc<Block>) {
106-
self.data.insert((KIND_INDEX, table_id, offset).into(), Item::Index(block));
107-
}
108-
109101
/// Inserts a VLog value into the cache.
110102
pub(crate) fn insert_vlog(&self, file_id: u32, offset: u64, value: Value) {
111103
self.data.insert((KIND_VLOG, file_id as u64, offset).into(), Item::VLog(value));
@@ -131,26 +123,6 @@ impl BlockCache {
131123
}
132124
}
133125

134-
/// Retrieves an index block from the cache.
135-
pub(crate) fn get_index_block(&self, table_id: u64, offset: u64) -> Option<Arc<Block>> {
136-
let key = (KIND_INDEX, table_id, &offset);
137-
let item = self.data.get(&key);
138-
139-
#[cfg(test)]
140-
{
141-
if item.is_some() {
142-
self.index_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
143-
} else {
144-
self.index_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
145-
}
146-
}
147-
148-
match item.as_ref()? {
149-
Item::Index(block) => Some(Arc::clone(block)),
150-
_ => None,
151-
}
152-
}
153-
154126
/// Retrieves a VLog value from the cache.
155127
pub(crate) fn get_vlog(&self, file_id: u32, offset: u64) -> Option<Value> {
156128
let key = (KIND_VLOG, file_id as u64, &offset);

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub enum Error {
5757
message: String,
5858
},
5959
SSTable(crate::sstable::error::SSTableError), // SSTable-specific errors
60+
PartitionBlockExpectedButNotFound,
6061
}
6162

6263
// Implementation of Display trait for Error
@@ -106,6 +107,7 @@ impl fmt::Display for Error {
106107
segment_id, offset, message
107108
),
108109
Self::SSTable(err) => write!(f, "SSTable error: {err}"),
110+
Self::PartitionBlockExpectedButNotFound => write!(f, "Partition block expected but not found"),
109111
}
110112
}
111113
}

src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ pub struct Options {
200200
/// Controls behavior when WAL corruption is detected during recovery.
201201
/// Default: TolerateCorruptedWithRepair (attempt repair and continue)
202202
pub wal_recovery_mode: WalRecoveryMode,
203+
204+
pub max_auto_readahead_size: usize,
205+
pub initial_auto_readahead_size: usize,
203206
}
204207

205208
impl Default for Options {
@@ -233,6 +236,8 @@ impl Default for Options {
233236
clock,
234237
flush_on_close: true,
235238
wal_recovery_mode: WalRecoveryMode::default(),
239+
max_auto_readahead_size: 256 * 1024, // 256KB
240+
initial_auto_readahead_size: 8 * 1024, // 8KB
236241
}
237242
}
238243
}
@@ -535,6 +540,16 @@ impl Options {
535540

536541
Ok(())
537542
}
543+
544+
pub const fn with_max_auto_readahead_size(mut self, size: usize) -> Self {
545+
self.max_auto_readahead_size = size;
546+
self
547+
}
548+
549+
pub const fn with_initial_auto_readahead_size(mut self, size: usize) -> Self {
550+
self.initial_auto_readahead_size = size;
551+
self
552+
}
538553
}
539554

540555
#[derive(Debug, PartialEq, Eq, Copy, Clone)]

src/sstable/index_block.rs

Lines changed: 128 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Note: Needs to be tested if a top-level index improves performance as such.
22
// TODO: Replace the current non-partitioned index block writer with this
33
use std::cmp::Ordering;
4+
use std::collections::HashMap;
45
use std::io::Write;
56
use std::sync::Arc;
67

@@ -183,9 +184,7 @@ pub(crate) struct TopLevelIndex {
183184
pub(crate) id: u64,
184185
pub(crate) opts: Arc<Options>,
185186
pub(crate) blocks: Vec<BlockHandleWithKey>,
186-
// TODO: Fix this, as this could be problematic if the file is being shared across without any
187-
// mutex
188-
pub(crate) file: Arc<dyn File>,
187+
pub(crate) partition_map: HashMap<u64, Arc<Block>>,
189188
}
190189

191190
impl TopLevelIndex {
@@ -199,20 +198,29 @@ impl TopLevelIndex {
199198
read_table_block(Arc::clone(&opt.internal_comparator), Arc::clone(&f), location)?;
200199
let iter = block.iter(false)?;
201200
let mut blocks = Vec::new();
201+
let mut partition_map = HashMap::new();
202202
for item in iter {
203203
let (key, handle) = item?;
204204
// Store full encoded internal key for correct partition lookup
205205
let (handle, _) = BlockHandle::decode(&handle)?;
206-
blocks.push(BlockHandleWithKey {
206+
let block_handle = BlockHandleWithKey {
207207
separator_key: key,
208208
handle,
209-
});
209+
};
210+
let block = Arc::new(read_table_block(
211+
Arc::clone(&opt.internal_comparator),
212+
Arc::clone(&f),
213+
&block_handle.handle,
214+
)?);
215+
216+
partition_map.insert(block_handle.offset(), block);
217+
blocks.push(block_handle);
210218
}
211219
Ok(TopLevelIndex {
212220
id,
213221
opts: opt,
214222
blocks,
215-
file: Arc::clone(&f),
223+
partition_map,
216224
})
217225
}
218226

@@ -246,23 +254,11 @@ impl TopLevelIndex {
246254
}
247255

248256
pub(crate) fn load_block(&self, block_handle: &BlockHandleWithKey) -> Result<Arc<Block>> {
249-
if let Some(block) = self.opts.block_cache.get_index_block(self.id, block_handle.offset()) {
250-
return Ok(block);
257+
if let Some(block) = self.partition_map.get(&block_handle.offset()) {
258+
return Ok(Arc::clone(block));
251259
}
252260

253-
let block_data = read_table_block(
254-
Arc::clone(&self.opts.internal_comparator),
255-
Arc::clone(&self.file),
256-
&block_handle.handle,
257-
)?;
258-
let block = Arc::new(block_data);
259-
self.opts.block_cache.insert_index_block(
260-
self.id,
261-
block_handle.offset(),
262-
Arc::clone(&block),
263-
);
264-
265-
Ok(block)
261+
Err(Error::PartitionBlockExpectedButNotFound)
266262
}
267263

268264
pub(crate) fn get(&self, target: &[u8]) -> Result<Arc<Block>> {
@@ -275,3 +271,114 @@ impl TopLevelIndex {
275271
}
276272
}
277273
}
274+
275+
/// Handles readahead for sequential block access patterns.
276+
pub(crate) struct BlockPrefetcher {
277+
// Current readahead size for FS prefetching
278+
pub(crate) readahead_size: usize,
279+
// Readahead limit for tracking what has been prefetched
280+
readahead_limit: u64,
281+
// Initial auto readahead size for internal prefetch buffer
282+
initial_auto_readahead_size: usize,
283+
// Maximum auto readahead size to cap exponential growth
284+
max_auto_readahead_size: usize,
285+
// Number of sequential file reads for auto readahead
286+
num_sequential_reads: usize,
287+
// Previous access pattern for sequential detection
288+
prev_offset: u64,
289+
prev_len: usize,
290+
}
291+
292+
impl BlockPrefetcher {
293+
pub(crate) fn new(initial_auto_readahead_size: usize, max_auto_readahead_size: usize) -> Self {
294+
let sanitized_initial = if initial_auto_readahead_size > max_auto_readahead_size {
295+
max_auto_readahead_size
296+
} else {
297+
initial_auto_readahead_size
298+
};
299+
300+
Self {
301+
readahead_size: sanitized_initial,
302+
readahead_limit: 0,
303+
initial_auto_readahead_size: sanitized_initial,
304+
max_auto_readahead_size,
305+
num_sequential_reads: 0,
306+
prev_offset: 0,
307+
prev_len: 0,
308+
}
309+
}
310+
311+
/// Update read pattern for sequential detection
312+
pub(crate) fn update_read_pattern(&mut self, offset: u64, len: usize) {
313+
self.prev_offset = offset;
314+
self.prev_len = len;
315+
}
316+
317+
/// Check if block access is sequential
318+
pub(crate) fn is_block_sequential(&self, offset: u64) -> bool {
319+
self.prev_len == 0 || (self.prev_offset + self.prev_len as u64 == offset)
320+
}
321+
322+
pub(crate) fn reset_values(&mut self, initial_auto_readahead_size: usize) {
323+
self.num_sequential_reads = 1;
324+
// Sanitize the initial size against max_auto_readahead_size
325+
let sanitized_initial = initial_auto_readahead_size.min(self.max_auto_readahead_size);
326+
self.initial_auto_readahead_size = sanitized_initial;
327+
self.readahead_size = sanitized_initial;
328+
self.readahead_limit = 0;
329+
}
330+
331+
pub(crate) fn prefetch_if_needed(
332+
&mut self,
333+
handle: &BlockHandle,
334+
file: &dyn crate::vfs::File,
335+
) -> bool {
336+
let len = handle.size() as u64;
337+
let offset = handle.offset() as u64;
338+
339+
// Check if already prefetched
340+
if offset + len <= self.readahead_limit {
341+
self.update_read_pattern(offset, len as usize);
342+
return false;
343+
}
344+
345+
// Check if access is sequential
346+
if !self.is_block_sequential(offset) {
347+
self.update_read_pattern(offset, len as usize);
348+
self.reset_values(self.initial_auto_readahead_size);
349+
return false;
350+
}
351+
352+
self.update_read_pattern(offset, len as usize);
353+
354+
// Auto readahead logic - try FS prefetch
355+
// Disable prefetching if either initial or max readahead size is 0
356+
if self.initial_auto_readahead_size == 0 || self.max_auto_readahead_size == 0 {
357+
return false;
358+
}
359+
360+
self.num_sequential_reads += 1;
361+
if self.num_sequential_reads <= 2 {
362+
// Default num_sequential_reads_for_auto_readahead
363+
return false;
364+
}
365+
366+
// Try FS-level prefetch for auto readahead
367+
if file.supports_prefetch() {
368+
let prefetch_result = file.prefetch(offset, len as usize + self.readahead_size);
369+
if prefetch_result.is_ok() {
370+
self.readahead_limit = offset + len + self.readahead_size as u64;
371+
self.grow_readahead_size();
372+
return false; // FS prefetch succeeded
373+
}
374+
}
375+
376+
true
377+
}
378+
379+
pub(crate) fn grow_readahead_size(&mut self) {
380+
if self.readahead_size < self.max_auto_readahead_size {
381+
self.readahead_size = (self.readahead_size * 2).min(self.max_auto_readahead_size);
382+
}
383+
}
384+
}

src/sstable/table.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::error::{Error, Result};
1313
use crate::sstable::block::{Block, BlockData, BlockHandle, BlockIterator, BlockWriter};
1414
use crate::sstable::error::SSTableError;
1515
use crate::sstable::filter_block::{FilterBlockReader, FilterBlockWriter};
16-
use crate::sstable::index_block::{TopLevelIndex, TopLevelIndexWriter};
16+
use crate::sstable::index_block::{BlockPrefetcher, TopLevelIndex, TopLevelIndexWriter};
1717
use crate::sstable::meta::TableMetadata;
1818
use crate::sstable::{
1919
InternalKey,
@@ -843,6 +843,10 @@ impl Table {
843843
keys_only,
844844
range,
845845
reverse_started: false,
846+
block_prefetcher: BlockPrefetcher::new(
847+
self.opts.initial_auto_readahead_size,
848+
self.opts.max_auto_readahead_size,
849+
),
846850
}
847851
}
848852

@@ -913,6 +917,8 @@ pub(crate) struct TableIterator {
913917
/// Whether reverse iteration has started (to distinguish from just
914918
/// positioned)
915919
reverse_started: bool,
920+
// Block prefetcher for readahead during iteration
921+
block_prefetcher: BlockPrefetcher,
916922
}
917923

918924
impl TableIterator {
@@ -975,6 +981,8 @@ impl TableIterator {
975981
}
976982

977983
fn load_block(&mut self, handle: &BlockHandle) -> Result<()> {
984+
self.block_prefetcher.prefetch_if_needed(handle, self.table.file.as_ref());
985+
978986
let block = self.table.read_block(handle)?;
979987
let mut block_iter = block.iter(self.keys_only)?;
980988

0 commit comments

Comments
 (0)