11// Note: Needs to be tested if a top-level index improves performance as such.
22// TODO: Replace the current non-partitioned index block writer with this
33use std:: cmp:: Ordering ;
4+ use std:: collections:: HashMap ;
45use std:: io:: Write ;
56use std:: sync:: Arc ;
67
@@ -183,9 +184,7 @@ pub(crate) struct TopLevelIndex {
183184 pub ( crate ) id : u64 ,
184185 pub ( crate ) opts : Arc < Options > ,
185186 pub ( crate ) blocks : Vec < BlockHandleWithKey > ,
186- // TODO: Fix this, as this could be problematic if the file is being shared across without any
187- // mutex
188- pub ( crate ) file : Arc < dyn File > ,
187+ pub ( crate ) partition_map : HashMap < u64 , Arc < Block > > ,
189188}
190189
191190impl TopLevelIndex {
@@ -199,20 +198,29 @@ impl TopLevelIndex {
199198 read_table_block ( Arc :: clone ( & opt. internal_comparator ) , Arc :: clone ( & f) , location) ?;
200199 let iter = block. iter ( false ) ?;
201200 let mut blocks = Vec :: new ( ) ;
201+ let mut partition_map = HashMap :: new ( ) ;
202202 for item in iter {
203203 let ( key, handle) = item?;
204204 // Store full encoded internal key for correct partition lookup
205205 let ( handle, _) = BlockHandle :: decode ( & handle) ?;
206- blocks . push ( BlockHandleWithKey {
206+ let block_handle = BlockHandleWithKey {
207207 separator_key : key,
208208 handle,
209- } ) ;
209+ } ;
210+ let block = Arc :: new ( read_table_block (
211+ Arc :: clone ( & opt. internal_comparator ) ,
212+ Arc :: clone ( & f) ,
213+ & block_handle. handle ,
214+ ) ?) ;
215+
216+ partition_map. insert ( block_handle. offset ( ) , block) ;
217+ blocks. push ( block_handle) ;
210218 }
211219 Ok ( TopLevelIndex {
212220 id,
213221 opts : opt,
214222 blocks,
215- file : Arc :: clone ( & f ) ,
223+ partition_map ,
216224 } )
217225 }
218226
@@ -246,23 +254,11 @@ impl TopLevelIndex {
246254 }
247255
248256 pub ( crate ) fn load_block ( & self , block_handle : & BlockHandleWithKey ) -> Result < Arc < Block > > {
249- if let Some ( block) = self . opts . block_cache . get_index_block ( self . id , block_handle. offset ( ) ) {
250- return Ok ( block) ;
257+ if let Some ( block) = self . partition_map . get ( & block_handle. offset ( ) ) {
258+ return Ok ( Arc :: clone ( block) ) ;
251259 }
252260
253- let block_data = read_table_block (
254- Arc :: clone ( & self . opts . internal_comparator ) ,
255- Arc :: clone ( & self . file ) ,
256- & block_handle. handle ,
257- ) ?;
258- let block = Arc :: new ( block_data) ;
259- self . opts . block_cache . insert_index_block (
260- self . id ,
261- block_handle. offset ( ) ,
262- Arc :: clone ( & block) ,
263- ) ;
264-
265- Ok ( block)
261+ Err ( Error :: PartitionBlockExpectedButNotFound )
266262 }
267263
268264 pub ( crate ) fn get ( & self , target : & [ u8 ] ) -> Result < Arc < Block > > {
@@ -275,3 +271,114 @@ impl TopLevelIndex {
275271 }
276272 }
277273}
274+
275+ /// Handles readahead for sequential block access patterns.
276+ pub ( crate ) struct BlockPrefetcher {
277+ // Current readahead size for FS prefetching
278+ pub ( crate ) readahead_size : usize ,
279+ // Readahead limit for tracking what has been prefetched
280+ readahead_limit : u64 ,
281+ // Initial auto readahead size for internal prefetch buffer
282+ initial_auto_readahead_size : usize ,
283+ // Maximum auto readahead size to cap exponential growth
284+ max_auto_readahead_size : usize ,
285+ // Number of sequential file reads for auto readahead
286+ num_sequential_reads : usize ,
287+ // Previous access pattern for sequential detection
288+ prev_offset : u64 ,
289+ prev_len : usize ,
290+ }
291+
292+ impl BlockPrefetcher {
293+ pub ( crate ) fn new ( initial_auto_readahead_size : usize , max_auto_readahead_size : usize ) -> Self {
294+ let sanitized_initial = if initial_auto_readahead_size > max_auto_readahead_size {
295+ max_auto_readahead_size
296+ } else {
297+ initial_auto_readahead_size
298+ } ;
299+
300+ Self {
301+ readahead_size : sanitized_initial,
302+ readahead_limit : 0 ,
303+ initial_auto_readahead_size : sanitized_initial,
304+ max_auto_readahead_size,
305+ num_sequential_reads : 0 ,
306+ prev_offset : 0 ,
307+ prev_len : 0 ,
308+ }
309+ }
310+
311+ /// Update read pattern for sequential detection
312+ pub ( crate ) fn update_read_pattern ( & mut self , offset : u64 , len : usize ) {
313+ self . prev_offset = offset;
314+ self . prev_len = len;
315+ }
316+
317+ /// Check if block access is sequential
318+ pub ( crate ) fn is_block_sequential ( & self , offset : u64 ) -> bool {
319+ self . prev_len == 0 || ( self . prev_offset + self . prev_len as u64 == offset)
320+ }
321+
322+ pub ( crate ) fn reset_values ( & mut self , initial_auto_readahead_size : usize ) {
323+ self . num_sequential_reads = 1 ;
324+ // Sanitize the initial size against max_auto_readahead_size
325+ let sanitized_initial = initial_auto_readahead_size. min ( self . max_auto_readahead_size ) ;
326+ self . initial_auto_readahead_size = sanitized_initial;
327+ self . readahead_size = sanitized_initial;
328+ self . readahead_limit = 0 ;
329+ }
330+
331+ pub ( crate ) fn prefetch_if_needed (
332+ & mut self ,
333+ handle : & BlockHandle ,
334+ file : & dyn crate :: vfs:: File ,
335+ ) -> bool {
336+ let len = handle. size ( ) as u64 ;
337+ let offset = handle. offset ( ) as u64 ;
338+
339+ // Check if already prefetched
340+ if offset + len <= self . readahead_limit {
341+ self . update_read_pattern ( offset, len as usize ) ;
342+ return false ;
343+ }
344+
345+ // Check if access is sequential
346+ if !self . is_block_sequential ( offset) {
347+ self . update_read_pattern ( offset, len as usize ) ;
348+ self . reset_values ( self . initial_auto_readahead_size ) ;
349+ return false ;
350+ }
351+
352+ self . update_read_pattern ( offset, len as usize ) ;
353+
354+ // Auto readahead logic - try FS prefetch
355+ // Disable prefetching if either initial or max readahead size is 0
356+ if self . initial_auto_readahead_size == 0 || self . max_auto_readahead_size == 0 {
357+ return false ;
358+ }
359+
360+ self . num_sequential_reads += 1 ;
361+ if self . num_sequential_reads <= 2 {
362+ // Default num_sequential_reads_for_auto_readahead
363+ return false ;
364+ }
365+
366+ // Try FS-level prefetch for auto readahead
367+ if file. supports_prefetch ( ) {
368+ let prefetch_result = file. prefetch ( offset, len as usize + self . readahead_size ) ;
369+ if prefetch_result. is_ok ( ) {
370+ self . readahead_limit = offset + len + self . readahead_size as u64 ;
371+ self . grow_readahead_size ( ) ;
372+ return false ; // FS prefetch succeeded
373+ }
374+ }
375+
376+ true
377+ }
378+
379+ pub ( crate ) fn grow_readahead_size ( & mut self ) {
380+ if self . readahead_size < self . max_auto_readahead_size {
381+ self . readahead_size = ( self . readahead_size * 2 ) . min ( self . max_auto_readahead_size ) ;
382+ }
383+ }
384+ }
0 commit comments