-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathlib.rs
More file actions
644 lines (555 loc) · 17.5 KB
/
lib.rs
File metadata and controls
644 lines (555 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
mod batch;
pub mod bplustree;
mod cache;
mod checkpoint;
mod clock;
mod commit;
mod compaction;
mod comparator;
mod compression;
mod discard;
mod error;
mod iter;
mod levels;
mod lockfile;
mod lsm;
mod memtable;
mod oracle;
mod snapshot;
mod sstable;
mod task;
mod transaction;
mod vfs;
mod vlog;
mod wal;
#[cfg(test)]
mod test;
use std::path::PathBuf;
use std::sync::Arc;
pub use comparator::{BytewiseComparator, Comparator, InternalKeyComparator, TimestampComparator};
use sstable::bloom::LevelDBBloomFilter;
use crate::clock::{DefaultLogicalClock, LogicalClock};
pub use crate::error::{Error, Result};
pub use crate::lsm::{Tree, TreeBuilder};
pub use crate::transaction::{Durability, Mode, ReadOptions, Transaction, WriteOptions};
/// An optimised trait for converting values to bytes only when needed
pub trait IntoBytes {
/// Convert the key to a slice of bytes
fn as_slice(&self) -> &[u8];
/// Convert the key to an owned bytes slice
fn into_bytes(self) -> Value;
}
impl IntoBytes for &[u8] {
fn as_slice(&self) -> &[u8] {
self
}
fn into_bytes(self) -> Value {
self.to_vec()
}
}
impl<const N: usize> IntoBytes for &[u8; N] {
fn as_slice(&self) -> &[u8] {
&self[..]
}
fn into_bytes(self) -> Value {
self.to_vec()
}
}
impl IntoBytes for Vec<u8> {
fn as_slice(&self) -> &[u8] {
self.as_slice()
}
fn into_bytes(self) -> Value {
self
}
}
impl IntoBytes for &Vec<u8> {
fn as_slice(&self) -> &[u8] {
&self[..]
}
fn into_bytes(self) -> Value {
self.clone()
}
}
impl IntoBytes for &str {
fn as_slice(&self) -> &[u8] {
self.as_bytes()
}
fn into_bytes(self) -> Value {
self.as_bytes().to_vec()
}
}
impl IntoBytes for Box<[u8]> {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}
fn into_bytes(self) -> Value {
self.into_vec()
}
}
// impl<'a> IntoBytes for Cow<'a, [u8]> {
// fn as_slice(&self) -> &[u8] {
// self.as_ref()
// }
// fn into_bytes(self) -> Value {
// self.into_owned()
// }
// }
/// Type alias for iterator results containing key-value pairs
/// Value is optional to support keys-only iteration without allocating empty
/// values
pub type IterResult = Result<(Key, Option<Value>)>;
/// The Key type used throughout the LSM tree
pub type Key = Vec<u8>;
/// The Value type used throughout the LSM tree
pub type Value = Vec<u8>;
/// Type alias for version/timestamp values
pub type Version = u64;
/// Type alias for iterator results containing only keys
pub type KeysResult = Result<Key>;
/// Type alias for iterator results containing keys and values
pub type RangeResult = Result<(Key, Value)>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VLogChecksumLevel {
/// No checksum verification - fastest but no data integrity protection
#[default]
Disabled = 0,
/// Full verification - recalculate checksum of value content
Full = 1,
}
/// WAL recovery mode to control consistency guarantees during crash recovery.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WalRecoveryMode {
/// Attempt automatic repair of corrupted WAL segments and retry replay.
#[default]
TolerateCorruptedWithRepair,
/// Fail immediately on any WAL corruption (no repair attempted).
AbsoluteConsistency,
}
#[derive(Clone)]
pub struct Options {
pub block_size: usize,
pub block_restart_interval: usize,
pub filter_policy: Option<Arc<dyn FilterPolicy>>,
pub comparator: Arc<dyn Comparator>,
pub(crate) internal_comparator: Arc<InternalKeyComparator>,
pub compression_per_level: Vec<CompressionType>,
pub(crate) block_cache: Arc<cache::BlockCache>,
pub path: PathBuf,
pub level_count: u8,
pub max_memtable_size: usize,
pub index_partition_size: usize,
// VLog configuration
pub vlog_max_file_size: u64,
pub vlog_checksum_verification: VLogChecksumLevel,
/// If true, disables `VLog` creation entirely
pub enable_vlog: bool,
/// Discard ratio threshold for triggering `VLog` garbage collection (0.0 -
/// 1.0) Default: 0.5 (50% discardable data triggers GC)
pub vlog_gc_discard_ratio: f64,
/// If value size is less than this, it will be stored inline in `SSTable`
pub vlog_value_threshold: usize,
// Versioned query configuration
/// If true, enables versioned queries with timestamp tracking
pub enable_versioning: bool,
/// History retention period in nanoseconds (0 means no retention limit)
/// Default: 0 (no retention limit)
pub versioned_history_retention_ns: u64,
/// Logical clock for time-based operations
pub(crate) clock: Arc<dyn LogicalClock>,
// Shutdown configuration
/// If true, flush active memtable to SSTable during shutdown.
/// If false, skip flush for faster shutdown.
///
/// DEFAULT: false
pub flush_on_close: bool,
// WAL recovery configuration
/// Controls behavior when WAL corruption is detected during recovery.
/// Default: TolerateCorruptedWithRepair (attempt repair and continue)
pub wal_recovery_mode: WalRecoveryMode,
pub max_auto_readahead_size: usize,
pub initial_auto_readahead_size: usize,
}
impl Default for Options {
fn default() -> Self {
let bf = LevelDBBloomFilter::new(10);
// Initialize the logical clock
let clock = Arc::new(DefaultLogicalClock::new());
let comparator: Arc<dyn Comparator> = Arc::new(crate::BytewiseComparator {});
let internal_comparator = Arc::new(InternalKeyComparator::new(Arc::clone(&comparator)));
Self {
block_size: 64 * 1024, // 64KB
block_restart_interval: 16,
comparator,
internal_comparator,
compression_per_level: Vec::new(),
filter_policy: Some(Arc::new(bf)),
block_cache: Arc::new(cache::BlockCache::with_capacity_bytes(1 << 20)), // 1MB cache
path: PathBuf::from(""),
level_count: 6,
max_memtable_size: 100 * 1024 * 1024, // 100 MB
index_partition_size: 16384, // 16KB
vlog_max_file_size: 256 * 1024 * 1024, // 256MB
vlog_checksum_verification: VLogChecksumLevel::Disabled,
enable_vlog: false,
vlog_gc_discard_ratio: 0.5, // 50% default
vlog_value_threshold: 4096, // 4KB default
enable_versioning: false,
versioned_history_retention_ns: 0, // No retention limit by default
clock,
flush_on_close: true,
wal_recovery_mode: WalRecoveryMode::default(),
max_auto_readahead_size: 256 * 1024, // 256KB
initial_auto_readahead_size: 8 * 1024, // 8KB
}
}
}
impl Options {
pub fn new() -> Self {
Self::default()
}
pub const fn with_block_size(mut self, value: usize) -> Self {
self.block_size = value;
self
}
pub const fn with_block_restart_interval(mut self, value: usize) -> Self {
self.block_restart_interval = value;
self
}
pub fn with_filter_policy(mut self, value: Option<Arc<dyn FilterPolicy>>) -> Self {
self.filter_policy = value;
self
}
pub fn with_comparator(mut self, value: Arc<dyn Comparator>) -> Self {
self.internal_comparator = Arc::new(InternalKeyComparator::new(Arc::clone(&value)));
self.comparator = value;
self
}
/// Disables compression for data blocks in SSTables.
///
/// This clears the compression_per_level vector, causing all levels
/// to default to CompressionType::None.
///
/// # Example
///
/// ```no_run
/// use surrealkv::Options;
///
/// let opts = Options::new().without_compression();
/// ```
pub fn without_compression(mut self) -> Self {
self.compression_per_level = Vec::new();
self
}
/// Sets compression per level. Vector index corresponds to level number.
/// If vector is shorter than level count, last compression type is used for
/// higher levels. If vector is empty, global compression setting is used.
///
/// # Example
///
/// ```no_run
/// use surrealkv::{Options, CompressionType};
///
/// let opts = Options::new()
/// .with_compression_per_level(vec![
/// CompressionType::None, // L0: no compression
/// CompressionType::SnappyCompression, // L1+: Snappy compression
/// ]);
/// ```
pub fn with_compression_per_level(mut self, levels: Vec<CompressionType>) -> Self {
self.compression_per_level = levels;
self
}
/// Convenience method: no compression on L0, Snappy compression on other
/// levels. Equivalent to
/// `with_compression_per_level(vec![CompressionType::None,
/// CompressionType::SnappyCompression])`.
///
/// # Example
///
/// ```no_run
/// use surrealkv::Options;
///
/// let opts = Options::new().with_l0_no_compression();
/// ```
pub fn with_l0_no_compression(mut self) -> Self {
self.compression_per_level =
vec![CompressionType::None, CompressionType::SnappyCompression];
self
}
pub fn with_path(mut self, value: PathBuf) -> Self {
self.path = value;
self
}
pub const fn with_level_count(mut self, value: u8) -> Self {
self.level_count = value;
self
}
pub const fn with_max_memtable_size(mut self, value: usize) -> Self {
self.max_memtable_size = value;
self
}
/// Sets the unified block cache capacity (includes data blocks, index
/// blocks, and VLog values)
pub fn with_block_cache_capacity(mut self, capacity_bytes: u64) -> Self {
self.block_cache = Arc::new(cache::BlockCache::with_capacity_bytes(capacity_bytes));
self
}
// Partitioned index configuration
pub const fn with_index_partition_size(mut self, size: usize) -> Self {
self.index_partition_size = size;
self
}
pub const fn with_vlog_max_file_size(mut self, value: u64) -> Self {
self.vlog_max_file_size = value;
self
}
pub const fn with_vlog_checksum_verification(mut self, value: VLogChecksumLevel) -> Self {
self.vlog_checksum_verification = value;
self
}
pub const fn with_enable_vlog(mut self, value: bool) -> Self {
self.enable_vlog = value;
self
}
/// Sets the `VLog` garbage collection discard ratio.
///
/// # Panics
///
/// Panics if the value is not between 0.0 and 1.0 (inclusive).
pub fn with_vlog_gc_discard_ratio(mut self, value: f64) -> Self {
assert!((0.0..=1.0).contains(&value), "VLog GC discard ratio must be between 0.0 and 1.0");
self.vlog_gc_discard_ratio = value;
self
}
/// Enables or disables versioned queries with timestamp tracking
/// When enabled, automatically configures VLog and value threshold for
/// optimal versioned query support
pub fn with_versioning(mut self, value: bool, retention_ns: u64) -> Self {
self.enable_versioning = value;
self.versioned_history_retention_ns = retention_ns;
if value {
// Versioned queries require VLog to be enabled
self.enable_vlog = true;
// All values should go to VLog for versioned queries
self.vlog_value_threshold = 0;
}
self
}
/// Controls whether to flush the active memtable during database shutdown.
///
/// When enabled, ensures all in-memory data is persisted to SSTables before
/// closing, at the cost of slower shutdown. When disabled, allows faster
/// shutdown but unpersisted data in the active memtable will be lost if
/// WAL is not enabled.
///
/// # Arguments
///
/// * `value` - If true, flush on close. If false, skip flush.
///
/// # Default
///
/// false (skip flush for faster shutdown)
pub const fn with_flush_on_close(mut self, value: bool) -> Self {
self.flush_on_close = value;
self
}
/// Sets the WAL recovery mode to control behavior when corruption is
/// detected.
///
/// # Arguments
///
/// * `mode` - The recovery mode to use:
/// - `TolerateCorruptedWithRepair`: Attempt repair and continue (default)
/// - `AbsoluteConsistency`: Fail immediately on any corruption
///
/// # Default
///
/// `TolerateCorruptedWithRepair`
pub const fn with_wal_recovery_mode(mut self, mode: WalRecoveryMode) -> Self {
self.wal_recovery_mode = mode;
self
}
/// Returns the path for a manifest file with the given ID
/// Format: {path}/manifest/{id:020}.manifest
pub(crate) fn manifest_file_path(&self, id: u64) -> PathBuf {
self.manifest_dir().join(format!("{id:020}.manifest"))
}
/// Returns the path for an `SSTable` file with the given ID
/// Format: {path}/sstables/{id:020}.sst
pub(crate) fn sstable_file_path(&self, id: u64) -> PathBuf {
self.sstable_dir().join(format!("{id:020}.sst"))
}
/// Returns the path for a `VLog` file with the given ID
/// Format: {path}/vlog/{id:020}.vlog
pub(crate) fn vlog_file_path(&self, id: u64) -> PathBuf {
self.vlog_dir().join(format!("{id:020}.vlog"))
}
/// Returns the directory path for WAL files
pub(crate) fn wal_dir(&self) -> PathBuf {
self.path.join("wal")
}
/// Returns the directory path for `SSTable` files
pub(crate) fn sstable_dir(&self) -> PathBuf {
self.path.join("sstables")
}
/// Returns the directory path for `VLog` files
pub(crate) fn vlog_dir(&self) -> PathBuf {
self.path.join("vlog")
}
/// Returns the directory path for manifest files
pub(crate) fn manifest_dir(&self) -> PathBuf {
self.path.join("manifest")
}
/// Returns the directory path for discard stats files
pub(crate) fn discard_stats_dir(&self) -> PathBuf {
self.path.join("discard_stats")
}
/// Returns the directory path for delete list files
pub(crate) fn delete_list_dir(&self) -> PathBuf {
self.path.join("delete_list")
}
/// Returns the directory path for versioned index files
pub(crate) fn versioned_index_dir(&self) -> PathBuf {
self.path.join("versioned_index")
}
/// Checks if a filename matches the `VLog` file naming pattern
/// Expected format: 20-digit zero-padded ID + ".vlog" (25 characters total)
pub(crate) fn is_vlog_filename(&self, filename: &str) -> bool {
filename.len() == 25
&& std::path::Path::new(filename)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("vlog"))
}
/// Extracts the file ID from a `VLog` filename
/// Returns None if the filename doesn't match the expected pattern
pub(crate) fn extract_vlog_file_id(&self, filename: &str) -> Option<u32> {
if self.is_vlog_filename(filename) {
if let Some(id_part) = filename.strip_suffix(".vlog") {
if id_part.len() == 20 && id_part.chars().all(|c| c.is_ascii_digit()) {
return id_part.parse::<u32>().ok();
}
}
}
None
}
/// Validates the configuration options for consistency and correctness
/// This should be called when the store starts to catch configuration
/// errors early
pub fn validate(&self) -> Result<()> {
// Validate VLog GC discard ratio
if !(0.0..=1.0).contains(&self.vlog_gc_discard_ratio) {
return Err(Error::InvalidArgument(
"VLog GC discard ratio must be between 0.0 and 1.0".to_string(),
));
}
// Validate versioned queries configuration
if self.enable_versioning {
// Versioned queries require VLog to be enabled
if !self.enable_vlog {
return Err(Error::InvalidArgument(
"Versioned queries require VLog to be enabled. Set enable_vlog to true."
.to_string(),
));
}
// Versioned queries don't work well with value threshold (values should go to
// VLog)
if self.vlog_value_threshold > 0 {
return Err(Error::InvalidArgument(
"Versioned queries require all values to be stored in VLog. Set vlog_value_threshold to 0.".to_string(),
));
}
}
// Validate level count is reasonable
if self.level_count == 0 {
return Err(Error::InvalidArgument("Level count must be at least 1".to_string()));
}
Ok(())
}
pub const fn with_max_auto_readahead_size(mut self, size: usize) -> Self {
self.max_auto_readahead_size = size;
self
}
pub const fn with_initial_auto_readahead_size(mut self, size: usize) -> Self {
self.initial_auto_readahead_size = size;
self
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum CompressionType {
None = 0,
SnappyCompression = 1,
}
impl CompressionType {
pub const fn as_str(&self) -> &'static str {
match *self {
Self::None => "none",
Self::SnappyCompression => "snappy",
}
}
}
impl TryFrom<u8> for CompressionType {
type Error = Error;
fn try_from(byte: u8) -> Result<Self> {
match byte {
0 => Ok(Self::None),
1 => Ok(Self::SnappyCompression),
_ => Err(Error::Compression(format!("Unknown compression type: {}", byte))),
}
}
}
pub trait FilterPolicy: Send + Sync {
/// Return the name of this policy. Note that if the filter encoding
/// changes in an incompatible way, the name returned by this method
/// must be changed. Otherwise, old incompatible filters may be
/// passed to methods of this type.
fn name(&self) -> &str;
/// `MayContain` returns whether the encoded filter may contain given key.
/// False positives are possible, where it returns true for keys not in the
/// original set.
fn may_contain(&self, filter: &[u8], key: &[u8]) -> bool;
/// Creates a filter based on given keys
fn create_filter(&self, keys: &[Vec<u8>]) -> Vec<u8>;
}
use std::ops::Bound;
/// Type alias for InternalKey range bounds
pub(crate) type InternalKeyRangeBound = Bound<sstable::InternalKey>;
/// Type alias for InternalKey ranges
pub(crate) type InternalKeyRange = (InternalKeyRangeBound, InternalKeyRangeBound);
/// Converts user key bounds to InternalKeyRange for efficient iteration.
pub(crate) fn user_range_to_internal_range(
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> InternalKeyRange {
use sstable::{
InternalKey,
InternalKeyKind,
INTERNAL_KEY_SEQ_NUM_MAX,
INTERNAL_KEY_TIMESTAMP_MAX,
};
let start_bound = match lower {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => Bound::Included(InternalKey::new(
key.into_bytes(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Max,
INTERNAL_KEY_TIMESTAMP_MAX,
)),
Bound::Excluded(key) => {
Bound::Excluded(InternalKey::new(key.into_bytes(), 0, InternalKeyKind::Set, 0))
}
};
let end_bound = match upper {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => {
Bound::Included(InternalKey::new(key.into_bytes(), 0, InternalKeyKind::Set, 0))
}
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.into_bytes(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Max,
INTERNAL_KEY_TIMESTAMP_MAX,
)),
};
(start_bound, end_bound)
}