diff --git a/Cargo.toml b/Cargo.toml index c056513..d9569e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "lsm-tree" description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)" license = "MIT OR Apache-2.0" -version = "2.0.2" +version = "2.1.0" edition = "2021" rust-version = "1.74.0" readme = "README.md" @@ -34,10 +34,11 @@ lz4_flex = { version = "0.11.3", optional = true } miniz_oxide = { version = "0.8.0", optional = true } path-absolutize = "3.1.1" quick_cache = { version = "0.6.5", default-features = false, features = [] } +rustc-hash = "2.0.0" self_cell = "1.0.4" smallvec = { version = "1.13.2" } tempfile = "3.12.0" -value-log = "1.0.0" +value-log = "1.1.0" varint-rs = "2.2.0" xxhash-rust = { version = "0.8.12", features = ["xxh3"] } diff --git a/benches/block.rs b/benches/block.rs index 5749615..88a22c6 100644 --- a/benches/block.rs +++ b/benches/block.rs @@ -1,11 +1,11 @@ use criterion::{criterion_group, criterion_main, Criterion}; use lsm_tree::{ + coding::Encode, segment::{ block::{header::Header as BlockHeader, ItemSize}, meta::CompressionType, value_block::ValueBlock, }, - serde::Serializable, Checksum, InternalValue, }; use std::io::Write; @@ -99,13 +99,56 @@ fn value_block_find(c: &mut Criterion) { } } +fn encode_block(c: &mut Criterion) { + let mut group = c.benchmark_group("Encode block"); + + for comp_type in [ + CompressionType::None, + CompressionType::Lz4, + CompressionType::Miniz(3), + ] { + for block_size in [1, 4, 8, 16, 32, 64, 128] { + let block_size = block_size * 1_024; + + let mut size = 0; + + let mut items = vec![]; + + for x in 0u64.. { + let value = InternalValue::from_components( + x.to_be_bytes(), + x.to_string().repeat(50).as_bytes(), + 63, + lsm_tree::ValueType::Value, + ); + + size += value.size(); + + items.push(value); + + if size >= block_size { + break; + } + } + + group.bench_function(format!("{block_size} KiB [{comp_type}]"), |b| { + b.iter(|| { + // Serialize block + let (mut header, data) = + ValueBlock::to_bytes_compressed(&items, 0, comp_type).unwrap(); + }); + }); + } + } +} + fn load_value_block_from_disk(c: &mut Criterion) { let mut group = c.benchmark_group("Load block from disk"); for comp_type in [ - CompressionType::None, + //CompressionType::None, CompressionType::Lz4, - CompressionType::Miniz(6), + //CompressionType::Miniz(3), ] { for block_size in [1, 4, 8, 16, 32, 64, 128] { let block_size = block_size * 1_024; @@ -133,7 +176,6 @@ fn load_value_block_from_disk(c: &mut Criterion) { // Serialize block let (mut header, data) = ValueBlock::to_bytes_compressed(&items, 0, comp_type).unwrap(); - header.checksum = Checksum::from_bytes(&data); let mut file = tempfile::tempfile().unwrap(); header.encode_into(&mut file).unwrap(); @@ -156,5 +198,10 @@ fn load_value_block_from_disk(c: &mut Criterion) { } } -criterion_group!(benches, value_block_find, load_value_block_from_disk,); +criterion_group!( + benches, + encode_block, + value_block_find, + load_value_block_from_disk, +); criterion_main!(benches); diff --git a/benches/level_manifest.rs b/benches/level_manifest.rs index c7f0928..b26e4d2 100644 --- a/benches/level_manifest.rs +++ b/benches/level_manifest.rs @@ -6,7 +6,7 @@ fn iterate_segments(c: &mut Criterion) { group.sample_size(10); for segment_count in [0, 1, 5, 10, 100, 500, 1_000, 2_000, 4_000] { - group.bench_function(&format!("iterate {segment_count} segments"), |b| { + group.bench_function(format!("iterate {segment_count} segments"), |b| { let folder = tempfile::tempdir_in(".bench").unwrap(); let tree = Config::new(folder).data_block_size(1_024).open().unwrap(); @@ -30,7 +30,7 @@ fn find_segment(c: &mut Criterion) { for segment_count in [1u64, 5, 10, 100, 500, 1_000, 2_000, 4_000] { group.bench_function( - &format!("find segment in {segment_count} segments - binary search"), + format!("find segment in {segment_count} segments - binary search"), |b| { let folder = tempfile::tempdir_in(".bench").unwrap(); let tree = Config::new(folder).data_block_size(1_024).open().unwrap(); @@ -49,6 +49,8 @@ fn find_segment(c: &mut Criterion) { .levels .first() .expect("should exist") + .as_disjoint() + .expect("should be disjoint") .get_segment_containing_key(key) .expect("should exist") }); @@ -56,7 +58,7 @@ fn find_segment(c: &mut Criterion) { ); group.bench_function( - &format!("find segment in {segment_count} segments - linear search"), + format!("find segment in {segment_count} segments - linear search"), |b| { let folder = tempfile::tempdir().unwrap(); let tree = Config::new(folder).data_block_size(1_024).open().unwrap(); diff --git a/benches/tli.rs b/benches/tli.rs index 826af41..d81c271 100644 --- a/benches/tli.rs +++ b/benches/tli.rs @@ -1,5 +1,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::segment::{block_index::BlockIndex, value_block::CachePolicy}; +use lsm_tree::segment::{ + block_index::BlockIndex, value_block::BlockOffset, value_block::CachePolicy, +}; fn tli_find_item(c: &mut Criterion) { use lsm_tree::segment::block_index::{ @@ -15,7 +17,7 @@ fn tli_find_item(c: &mut Criterion) { for x in 0..item_count { items.push(KeyedBlockHandle { end_key: x.to_be_bytes().into(), - offset: x, + offset: BlockOffset(x), }); } diff --git a/src/abstract.rs b/src/abstract.rs index a40bd9e..603c58c 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -19,6 +19,10 @@ pub type RangeItem = crate::Result; #[allow(clippy::module_name_repetitions)] #[enum_dispatch] pub trait AbstractTree { + /// Gets the memory usage of all bloom filters in the tree. + #[cfg(feature = "bloom")] + fn bloom_filter_size(&self) -> usize; + /* /// Imports data from a flat file (see [`Tree::export`]), /// blocking the caller until it is done. /// diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 4db486b..cc1ca9c 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -163,6 +163,11 @@ impl BlobTree { } impl AbstractTree for BlobTree { + #[cfg(feature = "bloom")] + fn bloom_filter_size(&self) -> usize { + self.index.bloom_filter_size() + } + fn sealed_memtable_count(&self) -> usize { self.index.sealed_memtable_count() } @@ -231,7 +236,6 @@ impl AbstractTree for BlobTree { segment_id, data_block_size: self.index.config.data_block_size, index_block_size: self.index.config.index_block_size, - evict_tombstones: false, folder: lsm_segment_folder, })? .use_compression(self.index.config.compression); diff --git a/src/block_cache.rs b/src/block_cache.rs index 1c632d9..d69a35f 100644 --- a/src/block_cache.rs +++ b/src/block_cache.rs @@ -4,6 +4,7 @@ use crate::either::Either::{self, Left, Right}; use crate::segment::id::GlobalSegmentId; +use crate::segment::value_block::BlockOffset; use crate::segment::{block_index::IndexBlock, value_block::ValueBlock}; use quick_cache::Weighter; use quick_cache::{sync::Cache, Equivalent}; @@ -13,16 +14,16 @@ type Item = Either, Arc>; // (Type (disk or index), Segment ID, Block offset) #[derive(Eq, std::hash::Hash, PartialEq)] -struct CacheKey(GlobalSegmentId, u64); +struct CacheKey(GlobalSegmentId, BlockOffset); -impl Equivalent for (GlobalSegmentId, u64) { +impl Equivalent for (GlobalSegmentId, BlockOffset) { fn equivalent(&self, key: &CacheKey) -> bool { self.0 == key.0 && self.1 == key.1 } } -impl From<(GlobalSegmentId, u64)> for CacheKey { - fn from((gid, bid): (GlobalSegmentId, u64)) -> Self { +impl From<(GlobalSegmentId, BlockOffset)> for CacheKey { + fn from((gid, bid): (GlobalSegmentId, BlockOffset)) -> Self { Self(gid, bid) } } @@ -65,7 +66,11 @@ impl Weighter for BlockWeighter { /// # Ok::<(), lsm_tree::Error>(()) /// ``` pub struct BlockCache { - data: Cache, + // NOTE: rustc_hash performed best: https://fjall-rs.github.io/post/fjall-2-1 + /// Concurrent cache implementation + data: Cache, + + /// Capacity in bytes capacity: u64, } @@ -75,14 +80,17 @@ impl BlockCache { pub fn with_capacity_bytes(bytes: u64) -> Self { use quick_cache::sync::DefaultLifecycle; + #[allow(clippy::default_trait_access)] + let quick_cache = Cache::with( + 1_000_000, + bytes, + BlockWeighter, + Default::default(), + DefaultLifecycle::default(), + ); + Self { - data: Cache::with( - 1_000_000, - bytes, - BlockWeighter, - xxhash_rust::xxh3::Xxh3Builder::new(), - DefaultLifecycle::default(), - ), + data: quick_cache, capacity: bytes, } } @@ -115,7 +123,7 @@ impl BlockCache { pub fn insert_disk_block( &self, segment_id: GlobalSegmentId, - offset: u64, + offset: BlockOffset, value: Arc, ) { if self.capacity > 0 { @@ -127,7 +135,7 @@ impl BlockCache { pub fn insert_index_block( &self, segment_id: GlobalSegmentId, - offset: u64, + offset: BlockOffset, value: Arc, ) { if self.capacity > 0 { @@ -140,7 +148,7 @@ impl BlockCache { pub fn get_disk_block( &self, segment_id: GlobalSegmentId, - offset: u64, + offset: BlockOffset, ) -> Option> { let key = (segment_id, offset); let item = self.data.get(&key)?; @@ -152,7 +160,7 @@ impl BlockCache { pub fn get_index_block( &self, segment_id: GlobalSegmentId, - offset: u64, + offset: BlockOffset, ) -> Option> { let key = (segment_id, offset); let item = self.data.get(&key)?; diff --git a/src/bloom/mod.rs b/src/bloom/mod.rs index 2bfca95..f963959 100644 --- a/src/bloom/mod.rs +++ b/src/bloom/mod.rs @@ -12,6 +12,7 @@ use bit_array::BitArray; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; +/// Two hashes that are used for double hashing pub type CompositeHash = (u64, u64); /// A standard bloom filter @@ -83,17 +84,12 @@ impl Decode for BloomFilter { } impl BloomFilter { - /// Size of bloom filter in bytes + /// Size of bloom filter in bytes. #[must_use] pub fn len(&self) -> usize { self.inner.len() } - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - fn from_raw(m: usize, k: usize, bytes: Box<[u8]>) -> Self { Self { inner: BitArray::from_bytes(bytes), @@ -175,7 +171,7 @@ impl BloomFilter { // NOTE: should be in bounds because of modulo #[allow(clippy::expect_used)] - if !self.inner.get(idx as usize) { + if !self.has_bit(idx as usize) { return false; } @@ -194,7 +190,7 @@ impl BloomFilter { self.contains_hash(Self::get_hash(key)) } - /// Adds the key to the filter + /// Adds the key to the filter. pub fn set_with_hash(&mut self, (mut h1, mut h2): CompositeHash) { for i in 0..(self.k as u64) { let idx = h1 % (self.m as u64); @@ -206,12 +202,17 @@ impl BloomFilter { } } - /// Sets the bit at the given index to `true` + /// Returns `true` if the bit at `idx` is `1`. + fn has_bit(&self, idx: usize) -> bool { + self.inner.get(idx as usize) + } + + /// Sets the bit at the given index to `true`. fn enable_bit(&mut self, idx: usize) { self.inner.set(idx, true); } - /// Gets the hash of a key + /// Gets the hash of a key. #[must_use] pub fn get_hash(key: &[u8]) -> CompositeHash { let h0 = xxhash_rust::xxh3::xxh3_128(key); @@ -236,12 +237,21 @@ mod tests { let mut filter = BloomFilter::with_fp_rate(10, 0.0001); - for key in [ + let keys = &[ b"item0", b"item1", b"item2", b"item3", b"item4", b"item5", b"item6", b"item7", b"item8", b"item9", - ] { - filter.set_with_hash(BloomFilter::get_hash(key)); + ]; + + for key in keys { + filter.set_with_hash(BloomFilter::get_hash(*key)); + } + + for key in keys { + assert!(filter.contains(&**key)); } + assert!(!filter.contains(b"asdasads")); + assert!(!filter.contains(b"item10")); + assert!(!filter.contains(b"cxycxycxy")); filter.encode_into(&mut file)?; file.sync_all()?; @@ -252,6 +262,13 @@ mod tests { assert_eq!(filter, filter_copy); + for key in keys { + assert!(filter.contains(&**key)); + } + assert!(!filter_copy.contains(b"asdasads")); + assert!(!filter_copy.contains(b"item10")); + assert!(!filter_copy.contains(b"cxycxycxy")); + Ok(()) } diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 3fa5f3c..7e10e72 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -129,6 +129,7 @@ mod tests { block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, + value_block::BlockOffset, Segment, }, time::unix_timestamp, @@ -150,13 +151,13 @@ mod tests { block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), offsets: FileOffsets { - bloom_ptr: 0, - range_filter_ptr: 0, - index_block_ptr: 0, - metadata_ptr: 0, - range_tombstones_ptr: 0, - tli_ptr: 0, - pfx_ptr: 0, + bloom_ptr: BlockOffset(0), + range_filter_ptr: BlockOffset(0), + index_block_ptr: BlockOffset(0), + metadata_ptr: BlockOffset(0), + range_tombstones_ptr: BlockOffset(0), + tli_ptr: BlockOffset(0), + pfx_ptr: BlockOffset(0), }, metadata: Metadata { diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 8742713..4b173be 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ config::Config, key_range::KeyRange, level_manifest::LevelManifest, segment::Segment, HashSet, }; -use std::{ops::Deref, sync::Arc}; +use std::sync::Arc; /// Levelled compaction strategy (LCS) /// @@ -57,24 +57,7 @@ impl Default for Strategy { } fn aggregate_key_range(segments: &[Arc]) -> KeyRange { - let (mut min, mut max) = segments - .first() - .expect("segment should always exist") - .metadata - .key_range - .deref() - .clone(); - - for other in segments.iter().skip(1) { - if other.metadata.key_range.0 < min { - min = other.metadata.key_range.0.clone(); - } - if other.metadata.key_range.1 > max { - max = other.metadata.key_range.1.clone(); - } - } - - KeyRange::new((min, max)) + KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range)) } fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> usize { @@ -138,6 +121,8 @@ impl CompactionStrategy for Strategy { segments_to_compact.push(segment); } + debug_assert!(!segments_to_compact.is_empty()); + let Some(next_level) = &resolved_view.get(next_level_index as usize) else { break; }; @@ -241,6 +226,7 @@ mod tests { block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, + value_block::BlockOffset, Segment, }, time::unix_timestamp, @@ -275,13 +261,13 @@ mod tests { block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), offsets: FileOffsets { - bloom_ptr: 0, - range_filter_ptr: 0, - index_block_ptr: 0, - metadata_ptr: 0, - range_tombstones_ptr: 0, - tli_ptr: 0, - pfx_ptr: 0, + bloom_ptr: BlockOffset(0), + range_filter_ptr: BlockOffset(0), + index_block_ptr: BlockOffset(0), + metadata_ptr: BlockOffset(0), + range_tombstones_ptr: BlockOffset(0), + tli_ptr: BlockOffset(0), + pfx_ptr: BlockOffset(0), }, metadata: Metadata { diff --git a/src/compaction/maintenance.rs b/src/compaction/maintenance.rs index d296073..8ec8350 100644 --- a/src/compaction/maintenance.rs +++ b/src/compaction/maintenance.rs @@ -87,7 +87,7 @@ mod tests { level_manifest::LevelManifest, segment::{ block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, - meta::Metadata, Segment, + meta::Metadata, value_block::BlockOffset, Segment, }, }; use std::sync::Arc; @@ -106,13 +106,13 @@ mod tests { block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), offsets: FileOffsets { - bloom_ptr: 0, - range_filter_ptr: 0, - index_block_ptr: 0, - metadata_ptr: 0, - range_tombstones_ptr: 0, - tli_ptr: 0, - pfx_ptr: 0, + bloom_ptr: BlockOffset(0), + range_filter_ptr: BlockOffset(0), + index_block_ptr: BlockOffset(0), + metadata_ptr: BlockOffset(0), + range_tombstones_ptr: BlockOffset(0), + tli_ptr: BlockOffset(0), + pfx_ptr: BlockOffset(0), }, metadata: Metadata { diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index f2e1b67..7e0605a 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -134,6 +134,7 @@ mod tests { block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, + value_block::BlockOffset, Segment, }, SeqNo, @@ -154,13 +155,13 @@ mod tests { block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), offsets: FileOffsets { - bloom_ptr: 0, - range_filter_ptr: 0, - index_block_ptr: 0, - metadata_ptr: 0, - range_tombstones_ptr: 0, - tli_ptr: 0, - pfx_ptr: 0, + bloom_ptr: BlockOffset(0), + range_filter_ptr: BlockOffset(0), + index_block_ptr: BlockOffset(0), + metadata_ptr: BlockOffset(0), + range_tombstones_ptr: BlockOffset(0), + tli_ptr: BlockOffset(0), + pfx_ptr: BlockOffset(0), }, metadata: Metadata { diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 05eb549..2b34175 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -169,7 +169,6 @@ fn merge_segments( // NOTE: Only evict tombstones when reaching the last level, // That way we don't resurrect data beneath the tombstone let is_last_level = payload.dest_level == last_level; - let should_evict_tombstones = is_last_level; let start = Instant::now(); @@ -178,7 +177,6 @@ fn merge_segments( payload.target_size, crate::segment::writer::Options { folder: segments_base_folder.clone(), - evict_tombstones: should_evict_tombstones, segment_id: 0, // TODO: this is never used in MultiWriter data_block_size: opts.config.data_block_size, index_block_size: opts.config.index_block_size, @@ -207,7 +205,14 @@ fn merge_segments( } for (idx, item) in merge_iter.enumerate() { - segment_writer.write(item?)?; + let item = item?; + + // IMPORTANT: We can only drop tombstones when writing into last level + if is_last_level && item.is_tombstone() { + continue; + } + + segment_writer.write(item)?; if idx % 100_000 == 0 && opts.stop_signal.is_stopped() { log::debug!("compactor: stopping amidst compaction because of stop signal"); @@ -265,10 +270,10 @@ fn merge_segments( io::{Seek, SeekFrom}, }; - assert!(bloom_ptr > 0, "can not find bloom filter block"); + assert!(*bloom_ptr > 0, "can not find bloom filter block"); let mut reader = File::open(&segment_file_path)?; - reader.seek(SeekFrom::Start(bloom_ptr))?; + reader.seek(SeekFrom::Start(*bloom_ptr))?; BloomFilter::decode_from(&mut reader)? }, })) diff --git a/src/key_range.rs b/src/key_range.rs index eea2f83..7afc754 100644 --- a/src/key_range.rs +++ b/src/key_range.rs @@ -16,6 +16,17 @@ use std::{ #[derive(Clone, Debug, PartialEq, Eq)] pub struct KeyRange((UserKey, UserKey)); +impl std::fmt::Display for KeyRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "[{}<=>{}]", + String::from_utf8_lossy(&self.0 .0), + String::from_utf8_lossy(&self.0 .1) + ) + } +} + impl std::ops::Deref for KeyRange { type Target = (UserKey, UserKey); @@ -29,6 +40,18 @@ impl KeyRange { Self(range) } + pub fn empty() -> Self { + Self((Slice::new(b""), Slice::new(b""))) + } + + fn min(&self) -> &UserKey { + &self.0 .0 + } + + fn max(&self) -> &UserKey { + &self.0 .1 + } + /// Returns `true` if the list of key ranges is disjoint pub fn is_disjoint(ranges: &[&Self]) -> bool { for (idx, a) in ranges.iter().enumerate() { @@ -92,6 +115,34 @@ impl KeyRange { lo_included && hi_included } + + /// Aggregates a key range. + /// + /// # Panics + /// + /// The iterator must not be empty + pub fn aggregate<'a>(mut iter: impl Iterator) -> Self { + // NOTE: See function documentation + #[allow(clippy::expect_used)] + let first = iter.next().expect("should not be empty"); + + let mut min = first.min(); + let mut max = first.max(); + + for other in iter { + let x = other.min(); + if x < min { + min = x; + } + + let x = other.max(); + if x > max { + max = x; + } + } + + Self((min.clone(), max.clone())) + } } impl Encode for KeyRange { @@ -129,6 +180,7 @@ impl Decode for KeyRange { #[cfg(test)] mod tests { use super::*; + use test_log::test; fn int_key_range(a: u64, b: u64) -> KeyRange { KeyRange::new((a.to_be_bytes().into(), b.to_be_bytes().into())) @@ -138,8 +190,22 @@ mod tests { KeyRange::new((a.as_bytes().into(), b.as_bytes().into())) } + #[test] + fn key_range_aggregate() { + let ranges = [ + int_key_range(2, 4), + int_key_range(0, 4), + int_key_range(7, 10), + ]; + let aggregated = KeyRange::aggregate(ranges.iter()); + let (min, max) = aggregated.0; + assert_eq!([0, 0, 0, 0, 0, 0, 0, 0], &*min); + assert_eq!([0, 0, 0, 0, 0, 0, 0, 10], &*max); + } + mod is_disjoint { use super::*; + use test_log::test; #[test] fn key_range_number() { @@ -169,6 +235,7 @@ mod tests { mod overflap_key_range { use super::*; + use test_log::test; #[test] fn key_range_overlap() { @@ -195,6 +262,7 @@ mod tests { mod overlaps_with_bounds { use super::*; use std::ops::Bound::{Excluded, Included, Unbounded}; + use test_log::test; #[test] fn inclusive() { diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index f8a0bb3..2b14cdf 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -2,8 +2,8 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{key_range::KeyRange, segment::meta::SegmentId, Segment}; -use std::sync::Arc; +use crate::{key_range::KeyRange, segment::meta::SegmentId, Segment, UserKey}; +use std::{ops::Bound, sync::Arc}; /// Level of an LSM-tree #[derive(Clone, Debug)] @@ -67,18 +67,22 @@ impl Level { } } + /// Sorts the level by key range ascending. + /// + /// segment 1 segment 2 segment 3 + /// [key:a] [key:c] [key:z] pub(crate) fn sort_by_key_range(&mut self) { self.segments .sort_by(|a, b| a.metadata.key_range.0.cmp(&b.metadata.key_range.0)); } - /// Sorts the level from newest to oldest + /// Sorts the level from newest to oldest. /// /// This will make segments with highest seqno get checked first, /// so if there are two versions of an item, the fresher one is seen first: /// - /// segment a segment b - /// [key:asd:2] [key:asd:1] + /// segment 1 segment 2 + /// [key:asd:2] [key:asd:1] /// /// point read -----------> pub(crate) fn sort_by_seqno(&mut self) { @@ -128,21 +132,77 @@ impl Level { .filter(|x| x.metadata.key_range.overlaps_with_key_range(key_range)) } + pub fn as_disjoint(&self) -> Option> { + if self.is_disjoint { + Some(DisjointLevel(self)) + } else { + None + } + } +} + +#[allow(clippy::module_name_repetitions)] +pub struct DisjointLevel<'a>(&'a Level); + +impl<'a> DisjointLevel<'a> { /// Returns the segment that possibly contains the key. - /// - /// This only works for disjoint levels. - /// - /// # Panics - /// - /// Panics if the level is not disjoint. pub fn get_segment_containing_key>(&self, key: K) -> Option> { - assert!(self.is_disjoint, "level is not disjoint"); + let level = &self.0; - let idx = self + let idx = level .segments .partition_point(|x| &*x.metadata.key_range.1 < key.as_ref()); - self.segments.get(idx).cloned() + level.segments.get(idx).cloned() + } + + pub fn range_indexes( + &'a self, + key_range: &'a (Bound, Bound), + ) -> Option<(usize, usize)> { + let level = &self.0; + + let lo = match &key_range.0 { + Bound::Unbounded => 0, + Bound::Included(start_key) => { + level.partition_point(|segment| &segment.metadata.key_range.1 < start_key) + } + Bound::Excluded(start_key) => { + level.partition_point(|segment| &segment.metadata.key_range.1 <= start_key) + } + }; + + if lo >= level.len() { + return None; + } + + let hi = match &key_range.1 { + Bound::Unbounded => level.len() - 1, + Bound::Included(end_key) => { + let idx = level.partition_point(|segment| &segment.metadata.key_range.0 <= end_key); + + if idx == 0 { + return None; + } + + idx.saturating_sub(1) // To avoid underflow + } + Bound::Excluded(end_key) => { + let idx = level.partition_point(|segment| &segment.metadata.key_range.0 < end_key); + + if idx == 0 { + return None; + } + + idx.saturating_sub(1) // To avoid underflow + } + }; + + if lo > hi { + return None; + } + + Some((lo, hi)) } } @@ -158,9 +218,10 @@ mod tests { block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, + value_block::BlockOffset, Segment, }, - AbstractTree, + AbstractTree, Slice, }; use std::sync::Arc; use test_log::test; @@ -178,13 +239,13 @@ mod tests { block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), offsets: FileOffsets { - bloom_ptr: 0, - range_filter_ptr: 0, - index_block_ptr: 0, - metadata_ptr: 0, - range_tombstones_ptr: 0, - tli_ptr: 0, - pfx_ptr: 0, + bloom_ptr: BlockOffset(0), + range_filter_ptr: BlockOffset(0), + index_block_ptr: BlockOffset(0), + metadata_ptr: BlockOffset(0), + range_tombstones_ptr: BlockOffset(0), + tli_ptr: BlockOffset(0), + pfx_ptr: BlockOffset(0), }, metadata: Metadata { @@ -212,6 +273,98 @@ mod tests { }) } + #[test] + #[allow(clippy::unwrap_used)] + fn level_disjoint_cull() { + let level = Level { + is_disjoint: true, + segments: vec![ + fixture_segment(0, KeyRange::new((Slice::from("a"), Slice::from("c")))), + fixture_segment(1, KeyRange::new((Slice::from("d"), Slice::from("g")))), + fixture_segment(2, KeyRange::new((Slice::from("h"), Slice::from("k")))), + ], + }; + let level = level.as_disjoint().unwrap(); + + { + let range = (Bound::Unbounded, Bound::Included(Slice::from("0"))); + let indexes = level.range_indexes(&range); + assert_eq!(None, indexes); + } + + { + let range = (Bound::Included(Slice::from("l")), Bound::Unbounded); + let indexes = level.range_indexes(&range); + assert_eq!(None, indexes); + } + + { + let range = ( + Bound::Included(Slice::from("d")), + Bound::Included(Slice::from("g")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((1, 1)), indexes); + } + + { + let range = ( + Bound::Excluded(Slice::from("d")), + Bound::Included(Slice::from("g")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((1, 1)), indexes); + } + + { + let range = ( + Bound::Included(Slice::from("d")), + Bound::Excluded(Slice::from("h")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((1, 1)), indexes); + } + + { + let range = ( + Bound::Included(Slice::from("d")), + Bound::Included(Slice::from("h")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((1, 2)), indexes); + } + + { + let range = (Bound::Included(Slice::from("d")), Bound::Unbounded); + let indexes = level.range_indexes(&range); + assert_eq!(Some((1, 2)), indexes); + } + + { + let range = ( + Bound::Included(Slice::from("a")), + Bound::Included(Slice::from("d")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((0, 1)), indexes); + } + + { + let range = ( + Bound::Included(Slice::from("a")), + Bound::Excluded(Slice::from("d")), + ); + let indexes = level.range_indexes(&range); + assert_eq!(Some((0, 0)), indexes); + } + + { + let range = (Bound::Unbounded, Bound::Unbounded); + let indexes = level.range_indexes(&range); + assert_eq!(Some((0, 2)), indexes); + } + } + #[test] fn level_disjoint() -> crate::Result<()> { let folder = tempfile::tempdir()?; diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index de8b917..eee30d0 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -22,6 +22,8 @@ use std::{ pub type HiddenSet = HashSet; +type Levels = Vec>; + /// Represents the levels of a log-structured merge tree. pub struct LevelManifest { /// Path of level manifest file @@ -29,7 +31,7 @@ pub struct LevelManifest { /// Actual levels containing segments #[doc(hidden)] - pub levels: Vec, + pub levels: Levels, /// Set of segment IDs that are masked /// @@ -107,12 +109,10 @@ impl LevelManifest { pub(crate) fn create_new>(level_count: u8, path: P) -> crate::Result { assert!(level_count > 0, "level_count should be >= 1"); - let levels = (0..level_count) - .map(|_| Level::default()) - .collect::>(); + let levels = (0..level_count).map(|_| Arc::default()).collect::>(); #[allow(unused_mut)] - let mut levels = Self { + let mut manifest = Self { path: path.as_ref().to_path_buf(), levels, hidden_set: HashSet::with_capacity_and_hasher( @@ -120,9 +120,9 @@ impl LevelManifest { xxhash_rust::xxh3::Xxh3Builder::new(), ), }; - Self::write_to_disk(path, &levels.levels)?; + Self::write_to_disk(path, &manifest.deep_clone())?; - Ok(levels) + Ok(manifest) } pub(crate) fn load_level_manifest>( @@ -169,7 +169,7 @@ impl LevelManifest { fn resolve_levels( level_manifest: Vec>, segments: &HashMap>, - ) -> Vec { + ) -> Levels { let mut levels = Vec::with_capacity(level_manifest.len()); for level in level_manifest { @@ -180,7 +180,7 @@ impl LevelManifest { created_level.insert(segment); } - levels.push(created_level); + levels.push(Arc::new(created_level)); } levels @@ -228,47 +228,54 @@ impl LevelManifest { Ok(()) } + /// Clones the level to get a mutable copy for atomic swap. + fn deep_clone(&self) -> Vec { + self.levels + .iter() + .map(|x| Level { + segments: x.segments.clone(), + is_disjoint: x.is_disjoint, + }) + .collect() + } + /// Modifies the level manifest atomically. pub(crate) fn atomic_swap)>(&mut self, f: F) -> crate::Result<()> { - // NOTE: Create a copy of the levels we can operate on + // NOTE: Copy-on-write... + // + // Create a copy of the levels we can operate on // without mutating the current level manifest // If persisting to disk fails, this way the level manifest // is unchanged - let mut working_copy = self.levels.clone(); + let mut working_copy = self.deep_clone(); f(&mut working_copy); Self::write_to_disk(&self.path, &working_copy)?; - self.levels = working_copy; + self.levels = working_copy.into_iter().map(Arc::new).collect(); + self.sort_levels(); log::trace!("Swapped level manifest to:\n{self}"); Ok(()) } - // NOTE: Used in tests #[allow(unused)] + #[cfg(test)] pub(crate) fn add(&mut self, segment: Arc) { self.insert_into_level(0, segment); } - /// Sorts all levels from newest to oldest - /// - /// This will make segments with highest seqno get checked first, - /// so if there are two versions of an item, the fresher one is seen first: - /// - /// segment a segment b - /// [key:asd:2] [key:asd:1] - /// - /// point read -----------> pub(crate) fn sort_levels(&mut self) { for level in &mut self.levels { - level.sort(); + Arc::get_mut(level) + .expect("could not get mutable Arc - this is a bug") + .sort(); } } - // NOTE: Used in tests #[allow(unused)] + #[cfg(test)] pub(crate) fn insert_into_level(&mut self, level_no: u8, segment: Arc) { let last_level_index = self.depth() - 1; let index = level_no.clamp(0, last_level_index); @@ -278,6 +285,8 @@ impl LevelManifest { .get_mut(index as usize) .expect("level should exist"); + let level = Arc::get_mut(level).expect("only used in tests"); + level.insert(segment); } @@ -304,7 +313,7 @@ impl LevelManifest { #[must_use] pub fn first_level_segment_count(&self) -> usize { - self.levels.first().map(Level::len).unwrap_or_default() + self.levels.first().map(|lvl| lvl.len()).unwrap_or_default() } /// Returns the amount of levels in the tree @@ -316,7 +325,7 @@ impl LevelManifest { /// Returns the amount of segments, summed over all levels #[must_use] pub fn len(&self) -> usize { - self.levels.iter().map(Level::len).sum() + self.levels.iter().map(|lvl| lvl.len()).sum() } /// Returns the (compressed) size of all segments @@ -351,13 +360,13 @@ impl LevelManifest { let mut output = Vec::with_capacity(self.len()); for raw_level in &self.levels { - let mut level = raw_level.clone(); - - for id in &self.hidden_set { - level.remove(*id); - } + let mut level = raw_level.iter().cloned().collect::>(); + level.retain(|x| !self.hidden_set.contains(&x.metadata.id)); - output.push(level); + output.push(Level { + segments: level, + is_disjoint: raw_level.is_disjoint, + }); } output @@ -464,13 +473,13 @@ mod tests { #[test] fn level_manifest_raw_empty() -> crate::Result<()> { - let levels = LevelManifest { + let manifest = LevelManifest { hidden_set: HashSet::default(), levels: Vec::default(), path: "a".into(), }; - let bytes = levels.levels.encode_into_vec()?; + let bytes = manifest.deep_clone().encode_into_vec()?; #[rustfmt::skip] let raw = &[ diff --git a/src/lib.rs b/src/lib.rs index ec5ef86..dd5169b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,6 +152,7 @@ mod memtable; #[doc(hidden)] pub mod merge; +mod multi_reader; mod mvcc_stream; mod path; diff --git a/src/segment/multi_reader.rs b/src/multi_reader.rs similarity index 99% rename from src/segment/multi_reader.rs rename to src/multi_reader.rs index 2c2aa34..f319a0c 100644 --- a/src/segment/multi_reader.rs +++ b/src/multi_reader.rs @@ -5,7 +5,7 @@ use crate::InternalValue; use std::collections::VecDeque; -/// Reads through a disjoint, sorted set of segment readers +/// Reads through a disjoint, sorted set of readers pub struct MultiReader>> { readers: VecDeque, } diff --git a/src/range.rs b/src/range.rs index a448287..df3d653 100644 --- a/src/range.rs +++ b/src/range.rs @@ -7,15 +7,16 @@ use crate::{ level_manifest::LevelManifest, memtable::Memtable, merge::{BoxedIterator, Merger}, + multi_reader::MultiReader, mvcc_stream::MvccStream, - segment::{multi_reader::MultiReader, range::Range as RangeReader}, + segment::level_reader::LevelReader, tree::inner::SealedMemtables, value::{SeqNo, UserKey}, KvPair, }; use guardian::ArcRwLockReadGuardian; use self_cell::self_cell; -use std::{collections::VecDeque, ops::Bound, sync::Arc}; +use std::{ops::Bound, sync::Arc}; #[must_use] pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool { @@ -81,20 +82,45 @@ impl DoubleEndedIterator for TreeIter { fn collect_disjoint_tree_with_range( level_manifest: &LevelManifest, bounds: &(Bound, Bound), -) -> MultiReader { - // TODO: bench... can probably be optimized by not linearly filtering, but using binary search etc. - // TODO: binary-filter per level and collect instead of sorting and whatever - let mut segments: Vec<_> = level_manifest - .iter() - .filter(|x| x.check_key_range_overlap(bounds)) - .collect(); +) -> MultiReader { + debug_assert!(level_manifest.is_disjoint()); - segments.sort_by(|a, b| a.metadata.key_range.0.cmp(&b.metadata.key_range.0)); + let mut levels = level_manifest + .levels + .iter() + .filter(|x| !x.is_empty()) + .cloned() + .collect::>(); + + // TODO: save key range per level, makes key range sorting easier + // and can remove levels not needed + + // NOTE: We know the levels are disjoint to each other, so we can just sort + // them by comparing the first segment + // + // NOTE: Also, we filter out levels that are empty, so expect is fine + #[allow(clippy::expect_used)] + levels.sort_by(|a, b| { + a.segments + .first() + .expect("level should not be empty") + .metadata + .key_range + .0 + .cmp( + &b.segments + .first() + .expect("level should not be empty") + .metadata + .key_range + .0, + ) + }); - let readers: VecDeque<_> = segments + let readers = levels .into_iter() - .map(|x| x.range(bounds.clone())) - .collect::>(); + .map(|lvl| LevelReader::new(lvl, bounds)) + .collect(); MultiReader::new(readers) } @@ -171,30 +197,16 @@ impl TreeIter { } else { for level in &level_manifest.levels { if level.is_disjoint { - let mut level = level.clone(); - - let mut readers: VecDeque> = VecDeque::new(); - - level.sort_by_key_range(); - - // TODO: can probably be optimized by using binary search per disjoint level to filter segments - for segment in &level.segments { - if segment.check_key_range_overlap(&bounds) { - let range = segment.range(bounds.clone()); - readers.push_back(Box::new(range)); - } - } - - if !readers.is_empty() { - let multi_reader = MultiReader::new(readers); + if !level.is_empty() { + let reader = LevelReader::new(level.clone(), &bounds); if let Some(seqno) = seqno { - iters.push(Box::new(multi_reader.filter(move |item| match item { + iters.push(Box::new(reader.filter(move |item| match item { Ok(item) => seqno_filter(item.key.seqno, seqno), Err(_) => true, }))); } else { - iters.push(Box::new(multi_reader)); + iters.push(Box::new(reader)); } } } else { diff --git a/src/segment/block/header.rs b/src/segment/block/header.rs index 99b9042..b6ae5c3 100644 --- a/src/segment/block/header.rs +++ b/src/segment/block/header.rs @@ -6,7 +6,7 @@ use super::checksum::Checksum; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError}, file::MAGIC_BYTES, - segment::meta::CompressionType, + segment::{meta::CompressionType, value_block::BlockOffset}, }; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; @@ -21,7 +21,7 @@ pub struct Header { pub checksum: Checksum, /// File offset of previous block - only used for data blocks - pub previous_block_offset: u64, + pub previous_block_offset: BlockOffset, /// Compressed size of data segment pub data_length: u32, @@ -59,7 +59,7 @@ impl Encode for Header { writer.write_u64::(*self.checksum)?; // Write prev offset - writer.write_u64::(self.previous_block_offset)?; + writer.write_u64::(*self.previous_block_offset)?; // Write data length writer.write_u32::(self.data_length)?; @@ -98,7 +98,7 @@ impl Decode for Header { Ok(Self { compression, checksum: Checksum::from_raw(checksum), - previous_block_offset, + previous_block_offset: BlockOffset(previous_block_offset), data_length, uncompressed_length, }) @@ -116,7 +116,7 @@ mod tests { let header = Header { compression: CompressionType::None, checksum: Checksum::from_raw(4), - previous_block_offset: 2, + previous_block_offset: BlockOffset(2), data_length: 15, uncompressed_length: 15, }; diff --git a/src/segment/block/mod.rs b/src/segment/block/mod.rs index bd06790..eb5fa38 100644 --- a/src/segment/block/mod.rs +++ b/src/segment/block/mod.rs @@ -5,7 +5,7 @@ pub mod checksum; pub mod header; -use super::meta::CompressionType; +use super::{meta::CompressionType, value_block::BlockOffset}; use crate::coding::{Decode, Encode}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use checksum::Checksum; @@ -80,15 +80,15 @@ impl Block { pub fn from_file( reader: &mut R, - offset: u64, + offset: BlockOffset, ) -> crate::Result { - reader.seek(std::io::SeekFrom::Start(offset))?; + reader.seek(std::io::SeekFrom::Start(*offset))?; Self::from_reader(reader) } pub fn to_bytes_compressed( items: &[T], - previous_block_offset: u64, + previous_block_offset: BlockOffset, compression: CompressionType, ) -> crate::Result<(BlockHeader, Vec)> { let packed = Self::pack_items(items, compression)?; @@ -157,7 +157,8 @@ mod tests { // Serialize to bytes let mut serialized = Vec::new(); - let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::None)?; + let (header, data) = + ValueBlock::to_bytes_compressed(&items, BlockOffset(0), CompressionType::None)?; header.encode_into(&mut serialized)?; serialized.write_all(&data)?; @@ -197,7 +198,8 @@ mod tests { // Serialize to bytes let mut serialized = Vec::new(); - let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::None)?; + let (header, data) = + ValueBlock::to_bytes_compressed(&items, BlockOffset(0), CompressionType::None)?; header.encode_into(&mut serialized)?; serialized.write_all(&data)?; diff --git a/src/segment/block_index/block_handle.rs b/src/segment/block_index/block_handle.rs index 72413a2..6845c23 100644 --- a/src/segment/block_index/block_handle.rs +++ b/src/segment/block_index/block_handle.rs @@ -4,7 +4,7 @@ use crate::{ coding::{Decode, DecodeError, Encode, EncodeError}, - segment::block::ItemSize, + segment::{block::ItemSize, value_block::BlockOffset}, value::UserKey, Slice, }; @@ -19,12 +19,12 @@ pub struct KeyedBlockHandle { pub end_key: UserKey, /// Position of block in file - pub offset: u64, + pub offset: BlockOffset, } impl KeyedBlockHandle { #[must_use] - pub fn new>(end_key: K, offset: u64) -> Self { + pub fn new>(end_key: K, offset: BlockOffset) -> Self { Self { end_key: end_key.into(), offset, @@ -34,7 +34,7 @@ impl KeyedBlockHandle { impl ItemSize for KeyedBlockHandle { fn size(&self) -> usize { - std::mem::size_of::() + self.end_key.len() + std::mem::size_of::() + self.end_key.len() } } @@ -47,7 +47,7 @@ impl Eq for KeyedBlockHandle {} impl std::hash::Hash for KeyedBlockHandle { fn hash(&self, state: &mut H) { - state.write_u64(self.offset); + state.write_u64(*self.offset); } } @@ -65,7 +65,7 @@ impl Ord for KeyedBlockHandle { impl Encode for KeyedBlockHandle { fn encode_into(&self, writer: &mut W) -> Result<(), EncodeError> { - writer.write_u64_varint(self.offset)?; + writer.write_u64_varint(*self.offset)?; // NOTE: Truncation is okay and actually needed #[allow(clippy::cast_possible_truncation)] @@ -88,7 +88,7 @@ impl Decode for KeyedBlockHandle { reader.read_exact(&mut key)?; Ok(Self { - offset, + offset: BlockOffset(offset), end_key: Slice::from(key), }) } @@ -101,8 +101,8 @@ mod tests { #[test] fn index_block_size() { let items = [ - KeyedBlockHandle::new("abcd", 5), - KeyedBlockHandle::new("efghij", 10), + KeyedBlockHandle::new("abcd", BlockOffset(5)), + KeyedBlockHandle::new("efghij", BlockOffset(10)), ]; assert_eq!(26, items.size()); } diff --git a/src/segment/block_index/mod.rs b/src/segment/block_index/mod.rs index c791d89..b8f85e5 100644 --- a/src/segment/block_index/mod.rs +++ b/src/segment/block_index/mod.rs @@ -77,10 +77,13 @@ pub trait BlockIndex { #[allow(clippy::expect_used)] mod tests { use super::*; - use crate::{segment::block_index::BlockIndex, Slice}; + use crate::{ + segment::{block_index::BlockIndex, value_block::BlockOffset}, + Slice, + }; use test_log::test; - fn bh>(end_key: K, offset: u64) -> KeyedBlockHandle { + fn bh>(end_key: K, offset: BlockOffset) -> KeyedBlockHandle { KeyedBlockHandle { end_key: end_key.into(), offset, @@ -90,11 +93,11 @@ mod tests { #[test] fn block_handle_array_lowest() { let index = [ - bh(*b"c", 0), - bh(*b"g", 10), - bh(*b"g", 20), - bh(*b"l", 30), - bh(*b"t", 40), + bh(*b"c", BlockOffset(0)), + bh(*b"g", BlockOffset(10)), + bh(*b"g", BlockOffset(20)), + bh(*b"l", BlockOffset(30)), + bh(*b"t", BlockOffset(40)), ]; { @@ -104,7 +107,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -114,7 +117,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -124,7 +127,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -134,7 +137,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"g"); - assert_eq!(handle.offset, 10); + assert_eq!(handle.offset, BlockOffset(10)); } { @@ -144,7 +147,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"l"); - assert_eq!(handle.offset, 30); + assert_eq!(handle.offset, BlockOffset(30)); } { @@ -154,7 +157,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"t"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -164,7 +167,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"t"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -179,13 +182,13 @@ mod tests { #[test] fn block_handle_array_spanning_lowest() { let index = [ - bh(*b"a", 0), - bh(*b"a", 10), - bh(*b"a", 20), - bh(*b"a", 30), - bh(*b"b", 40), - bh(*b"b", 50), - bh(*b"c", 60), + bh(*b"a", BlockOffset(0)), + bh(*b"a", BlockOffset(10)), + bh(*b"a", BlockOffset(20)), + bh(*b"a", BlockOffset(30)), + bh(*b"b", BlockOffset(40)), + bh(*b"b", BlockOffset(50)), + bh(*b"c", BlockOffset(60)), ]; { @@ -195,7 +198,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"a"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -205,7 +208,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"a"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -215,7 +218,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"b"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -225,7 +228,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"b"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -235,7 +238,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 60); + assert_eq!(handle.offset, BlockOffset(60)); } { @@ -250,13 +253,13 @@ mod tests { #[test] fn block_handle_array_last_of_key() { let index = [ - bh(*b"a", 0), - bh(*b"a", 10), - bh(*b"a", 20), - bh(*b"a", 30), - bh(*b"b", 40), - bh(*b"b", 50), - bh(*b"c", 60), + bh(*b"a", BlockOffset(0)), + bh(*b"a", BlockOffset(10)), + bh(*b"a", BlockOffset(20)), + bh(*b"a", BlockOffset(30)), + bh(*b"b", BlockOffset(40)), + bh(*b"b", BlockOffset(50)), + bh(*b"c", BlockOffset(60)), ]; { @@ -266,7 +269,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"a"); - assert_eq!(handle.offset, 0); + assert_eq!(handle.offset, BlockOffset(0)); } { @@ -276,7 +279,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"b"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -286,7 +289,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"b"); - assert_eq!(handle.offset, 40); + assert_eq!(handle.offset, BlockOffset(40)); } { @@ -296,7 +299,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 60); + assert_eq!(handle.offset, BlockOffset(60)); } { @@ -306,7 +309,7 @@ mod tests { .expect("should exist"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 60); + assert_eq!(handle.offset, BlockOffset(60)); } { @@ -321,13 +324,13 @@ mod tests { #[test] fn block_handle_array_last() { let index = [ - bh(*b"a", 0), - bh(*b"a", 10), - bh(*b"a", 20), - bh(*b"a", 30), - bh(*b"b", 40), - bh(*b"b", 50), - bh(*b"c", 60), + bh(*b"a", BlockOffset(0)), + bh(*b"a", BlockOffset(10)), + bh(*b"a", BlockOffset(20)), + bh(*b"a", BlockOffset(30)), + bh(*b"b", BlockOffset(40)), + bh(*b"b", BlockOffset(50)), + bh(*b"c", BlockOffset(60)), ]; { @@ -336,7 +339,7 @@ mod tests { .expect("cannot fail"); assert_eq!(&*handle.end_key, *b"c"); - assert_eq!(handle.offset, 60); + assert_eq!(handle.offset, BlockOffset(60)); } } } diff --git a/src/segment/block_index/top_level.rs b/src/segment/block_index/top_level.rs index c4a1be5..71672bf 100644 --- a/src/segment/block_index/top_level.rs +++ b/src/segment/block_index/top_level.rs @@ -3,7 +3,10 @@ // (found in the LICENSE-* files in the repository) use super::{block_handle::KeyedBlockHandle, BlockIndex}; -use crate::segment::{block_index::IndexBlock, value_block::CachePolicy}; +use crate::segment::{ + block_index::IndexBlock, + value_block::{BlockOffset, CachePolicy}, +}; use std::{fs::File, path::Path}; /// The block index stores references to the positions of blocks on a file and their size @@ -36,7 +39,7 @@ impl TopLevelIndex { } /// Loads a top-level index from disk - pub fn from_file>(path: P, offset: u64) -> crate::Result { + pub fn from_file>(path: P, offset: BlockOffset) -> crate::Result { let path = path.as_ref(); log::trace!("reading TLI from {path:?}, offset={offset}"); diff --git a/src/segment/block_index/two_level_index.rs b/src/segment/block_index/two_level_index.rs index ffdb284..c66649e 100644 --- a/src/segment/block_index/two_level_index.rs +++ b/src/segment/block_index/two_level_index.rs @@ -8,7 +8,10 @@ use super::{ top_level::TopLevelIndex, BlockIndex, IndexBlock, }; -use crate::{block_cache::BlockCache, descriptor_table::FileDescriptorTable}; +use crate::{ + block_cache::BlockCache, descriptor_table::FileDescriptorTable, + segment::value_block::BlockOffset, +}; use std::{path::Path, sync::Arc}; /// Allows reading index blocks - just a wrapper around a block cache @@ -16,12 +19,12 @@ use std::{path::Path, sync::Arc}; pub struct IndexBlockFetcher(Arc); impl IndexBlockFetcher { - pub fn insert(&self, segment_id: GlobalSegmentId, offset: u64, value: Arc) { + pub fn insert(&self, segment_id: GlobalSegmentId, offset: BlockOffset, value: Arc) { self.0.insert_index_block(segment_id, offset, value); } #[must_use] - pub fn get(&self, segment_id: GlobalSegmentId, offset: u64) -> Option> { + pub fn get(&self, segment_id: GlobalSegmentId, offset: BlockOffset) -> Option> { self.0.get_index_block(segment_id, offset) } } @@ -185,7 +188,7 @@ impl TwoLevelBlockIndex { pub fn from_file>( file_path: P, - offset: u64, + offset: BlockOffset, segment_id: GlobalSegmentId, descriptor_table: Arc, block_cache: Arc, diff --git a/src/segment/block_index/writer.rs b/src/segment/block_index/writer.rs index f206199..49342f6 100644 --- a/src/segment/block_index/writer.rs +++ b/src/segment/block_index/writer.rs @@ -5,7 +5,9 @@ use super::{IndexBlock, KeyedBlockHandle}; use crate::{ coding::Encode, - segment::{block::header::Header as BlockHeader, meta::CompressionType}, + segment::{ + block::header::Header as BlockHeader, meta::CompressionType, value_block::BlockOffset, + }, value::UserKey, }; use std::{ @@ -16,7 +18,7 @@ use std::{ pub struct Writer { file_pos: u64, - prev_pos: (u64, u64), + prev_pos: (BlockOffset, BlockOffset), write_buffer: Vec, @@ -35,13 +37,13 @@ impl Writer { pub fn new(block_size: u32) -> crate::Result { Ok(Self { file_pos: 0, - prev_pos: (0, 0), - write_buffer: Vec::with_capacity(block_size as usize), + prev_pos: (BlockOffset(0), BlockOffset(0)), + write_buffer: Vec::with_capacity(u16::MAX.into()), buffer_size: 0, block_size, compression: CompressionType::None, - block_handles: Vec::with_capacity(1_000), - tli_pointers: Vec::with_capacity(1_000), + block_handles: Vec::new(), + tli_pointers: Vec::new(), block_count: 0, }) } @@ -63,44 +65,46 @@ impl Writer { header.encode_into(&mut self.write_buffer)?; self.write_buffer.write_all(&data)?; - let bytes_written = (BlockHeader::serialized_len() + data.len()) as u64; + // NOTE: Expect is fine, the block size definitely fits into u64 + #[allow(clippy::expect_used)] + let bytes_written: u64 = (BlockHeader::serialized_len() + data.len()) + .try_into() + .expect("block size should fit into u64"); // NOTE: Expect is fine, because the chunk is not empty + // + // Also, we are allowed to remove the last item + // to get ownership of it, because the chunk is cleared after + // this anyway #[allow(clippy::expect_used)] - let last = self - .block_handles - .last() - .expect("Chunk should not be empty"); + let last = self.block_handles.pop().expect("Chunk should not be empty"); let index_block_handle = KeyedBlockHandle { - end_key: last.end_key.clone(), - offset: self.file_pos, + end_key: last.end_key, + offset: BlockOffset(self.file_pos), }; self.tli_pointers.push(index_block_handle); self.buffer_size = 0; self.file_pos += bytes_written; + self.block_count += 1; + // Back link stuff self.prev_pos.0 = self.prev_pos.1; self.prev_pos.1 += bytes_written; - self.block_count += 1; - self.block_handles.clear(); Ok(()) } - pub fn register_block(&mut self, start_key: UserKey, offset: u64) -> crate::Result<()> { + pub fn register_block(&mut self, end_key: UserKey, offset: BlockOffset) -> crate::Result<()> { // NOTE: Truncation is OK, because a key is bound by 65535 bytes, so can never exceed u32s #[allow(clippy::cast_possible_truncation)] - let block_handle_size = (start_key.len() + std::mem::size_of::()) as u32; + let block_handle_size = (end_key.len() + std::mem::size_of::()) as u32; - let block_handle = KeyedBlockHandle { - end_key: start_key, - offset, - }; + let block_handle = KeyedBlockHandle { end_key, offset }; self.block_handles.push(block_handle); @@ -116,7 +120,7 @@ impl Writer { fn write_top_level_index( &mut self, block_file_writer: &mut BufWriter, - file_offset: u64, + file_offset: BlockOffset, ) -> crate::Result { block_file_writer.write_all(&self.write_buffer)?; let tli_ptr = block_file_writer.stream_position()?; @@ -129,7 +133,7 @@ impl Writer { // Write to file let (header, data) = - IndexBlock::to_bytes_compressed(&self.tli_pointers, 0, self.compression)?; + IndexBlock::to_bytes_compressed(&self.tli_pointers, BlockOffset(0), self.compression)?; header.encode_into(block_file_writer)?; block_file_writer.write_all(&data)?; @@ -149,14 +153,17 @@ impl Writer { } /// Returns the offset in the file to TLI - pub fn finish(&mut self, block_file_writer: &mut BufWriter) -> crate::Result { + pub fn finish( + &mut self, + block_file_writer: &mut BufWriter, + ) -> crate::Result { if self.buffer_size > 0 { self.write_block()?; } - let index_block_ptr = block_file_writer.stream_position()?; + let index_block_ptr = BlockOffset(block_file_writer.stream_position()?); let tli_ptr = self.write_top_level_index(block_file_writer, index_block_ptr)?; - Ok(tli_ptr) + Ok(BlockOffset(tli_ptr)) } } diff --git a/src/segment/file_offsets.rs b/src/segment/file_offsets.rs index ed2ff8c..632d068 100644 --- a/src/segment/file_offsets.rs +++ b/src/segment/file_offsets.rs @@ -2,25 +2,26 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) +use super::value_block::BlockOffset; use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; #[derive(Debug, Default, PartialEq, Eq)] pub struct FileOffsets { - pub metadata_ptr: u64, - pub index_block_ptr: u64, - pub tli_ptr: u64, - pub bloom_ptr: u64, + pub metadata_ptr: BlockOffset, + pub index_block_ptr: BlockOffset, + pub tli_ptr: BlockOffset, + pub bloom_ptr: BlockOffset, // TODO: #46 https://github.com/fjall-rs/lsm-tree/issues/46 - pub range_filter_ptr: u64, + pub range_filter_ptr: BlockOffset, // TODO: #2 https://github.com/fjall-rs/lsm-tree/issues/2 - pub range_tombstones_ptr: u64, + pub range_tombstones_ptr: BlockOffset, // TODO: prefix filter for l0, l1? - pub pfx_ptr: u64, + pub pfx_ptr: BlockOffset, } impl FileOffsets { @@ -33,13 +34,13 @@ impl FileOffsets { impl Encode for FileOffsets { fn encode_into(&self, writer: &mut W) -> Result<(), EncodeError> { - writer.write_u64::(self.metadata_ptr)?; - writer.write_u64::(self.index_block_ptr)?; - writer.write_u64::(self.tli_ptr)?; - writer.write_u64::(self.bloom_ptr)?; - writer.write_u64::(self.range_filter_ptr)?; - writer.write_u64::(self.range_tombstones_ptr)?; - writer.write_u64::(self.pfx_ptr)?; + writer.write_u64::(*self.metadata_ptr)?; + writer.write_u64::(*self.index_block_ptr)?; + writer.write_u64::(*self.tli_ptr)?; + writer.write_u64::(*self.bloom_ptr)?; + writer.write_u64::(*self.range_filter_ptr)?; + writer.write_u64::(*self.range_tombstones_ptr)?; + writer.write_u64::(*self.pfx_ptr)?; Ok(()) } } @@ -55,13 +56,13 @@ impl Decode for FileOffsets { let pfx_ptr = reader.read_u64::()?; Ok(Self { - index_block_ptr, - tli_ptr, - bloom_ptr, - range_filter_ptr: rf_ptr, - range_tombstones_ptr, - pfx_ptr, - metadata_ptr, + index_block_ptr: BlockOffset(index_block_ptr), + tli_ptr: BlockOffset(tli_ptr), + bloom_ptr: BlockOffset(bloom_ptr), + range_filter_ptr: BlockOffset(rf_ptr), + range_tombstones_ptr: BlockOffset(range_tombstones_ptr), + pfx_ptr: BlockOffset(pfx_ptr), + metadata_ptr: BlockOffset(metadata_ptr), }) } } @@ -75,13 +76,13 @@ mod tests { #[test] fn file_offsets_roundtrip() -> crate::Result<()> { let before = FileOffsets { - bloom_ptr: 15, - index_block_ptr: 14, - metadata_ptr: 17, - pfx_ptr: 18, - range_filter_ptr: 13, - range_tombstones_ptr: 5, - tli_ptr: 4, + bloom_ptr: BlockOffset(15), + index_block_ptr: BlockOffset(14), + metadata_ptr: BlockOffset(17), + pfx_ptr: BlockOffset(18), + range_filter_ptr: BlockOffset(13), + range_tombstones_ptr: BlockOffset(5), + tli_ptr: BlockOffset(4), }; let buf = before.encode_into_vec()?; diff --git a/src/segment/level_reader.rs b/src/segment/level_reader.rs new file mode 100644 index 0000000..3e42602 --- /dev/null +++ b/src/segment/level_reader.rs @@ -0,0 +1,217 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use super::range::Range; +use crate::{level_manifest::level::Level, InternalValue, UserKey}; +use std::{ops::Bound, sync::Arc}; + +/// Reads through a disjoint level +pub struct LevelReader { + segments: Arc, + lo: usize, + hi: usize, + lo_reader: Option, + hi_reader: Option, +} + +impl LevelReader { + #[must_use] + pub fn new(level: Arc, range: &(Bound, Bound)) -> Self { + assert!(!level.is_empty(), "level reader cannot read empty level"); + + let disjoint_level = level.as_disjoint().expect("level should be disjoint"); + + let Some((lo, hi)) = disjoint_level.range_indexes(range) else { + // NOTE: We will never emit any item + return Self { + segments: level, + lo: 0, + hi: 0, + lo_reader: None, + hi_reader: None, + }; + }; + + let lo_segment = level.get(lo).expect("should exist"); + let lo_reader = lo_segment.range(range.clone()); + + let hi_reader = if hi > lo { + let hi_segment = level.get(hi).expect("should exist"); + + Some(hi_segment.range(range.clone())) + } else { + None + }; + + Self { + segments: level, + lo, + hi, + lo_reader: Some(lo_reader), + hi_reader, + } + } +} + +impl Iterator for LevelReader { + type Item = crate::Result; + + fn next(&mut self) -> Option { + loop { + if let Some(lo_reader) = &mut self.lo_reader { + if let Some(item) = lo_reader.next() { + return Some(item); + } + + // NOTE: Lo reader is empty, get next one + self.lo_reader = None; + self.lo += 1; + + if self.lo < self.hi { + self.lo_reader = Some(self.segments.get(self.lo).expect("should exist").iter()); + } + } else if let Some(hi_reader) = &mut self.hi_reader { + // NOTE: We reached the hi marker, so consume from it instead + // + // If it returns nothing, it is empty, so we are done + return hi_reader.next(); + } else { + return None; + } + } + } +} + +impl DoubleEndedIterator for LevelReader { + fn next_back(&mut self) -> Option { + loop { + if let Some(hi_reader) = &mut self.hi_reader { + if let Some(item) = hi_reader.next_back() { + return Some(item); + } + + // NOTE: Hi reader is empty, get orev one + self.hi_reader = None; + self.hi -= 1; + + if self.lo < self.hi { + self.hi_reader = Some(self.segments.get(self.hi).expect("should exist").iter()); + } + } else if let Some(lo_reader) = &mut self.lo_reader { + // NOTE: We reached the lo marker, so consume from it instead + // + // If it returns nothing, it is empty, so we are done + return lo_reader.next_back(); + } else { + return None; + } + } + } +} + +#[cfg(test)] +#[allow(clippy::expect_used)] +mod tests { + use super::*; + use crate::{AbstractTree, Slice}; + use std::ops::Bound::Unbounded; + use test_log::test; + + // TODO: same test for prefix & ranges + + #[test] + fn level_reader_basic() -> crate::Result<()> { + let tempdir = tempfile::tempdir()?; + let tree = crate::Config::new(&tempdir).open()?; + + let ids = [ + ["a", "b", "c"], + ["d", "e", "f"], + ["g", "h", "i"], + ["j", "k", "l"], + ]; + + for batch in ids { + for id in batch { + tree.insert(id, vec![], 0); + } + tree.flush_active_memtable(0)?; + } + + let segments = tree + .levels + .read() + .expect("lock is poisoned") + .iter() + .cloned() + .collect::>(); + + let level = Arc::new(Level { + segments, + is_disjoint: true, + }); + + #[allow(clippy::unwrap_used)] + { + let multi_reader = LevelReader::new(level.clone(), &(Unbounded, Unbounded)); + + let mut iter = multi_reader.flatten(); + + assert_eq!(Slice::from(*b"a"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"b"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"c"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"d"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"e"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"f"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"g"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"h"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"i"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"j"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"k"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"l"), iter.next().unwrap().key.user_key); + } + + #[allow(clippy::unwrap_used)] + { + let multi_reader = LevelReader::new(level.clone(), &(Unbounded, Unbounded)); + + let mut iter = multi_reader.rev().flatten(); + + assert_eq!(Slice::from(*b"l"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"k"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"j"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"i"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"h"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"g"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"f"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"e"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"d"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"c"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"b"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"a"), iter.next().unwrap().key.user_key); + } + + #[allow(clippy::unwrap_used)] + { + let multi_reader = LevelReader::new(level, &(Unbounded, Unbounded)); + + let mut iter = multi_reader.flatten(); + + assert_eq!(Slice::from(*b"a"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"l"), iter.next_back().unwrap().key.user_key); + assert_eq!(Slice::from(*b"b"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"k"), iter.next_back().unwrap().key.user_key); + assert_eq!(Slice::from(*b"c"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"j"), iter.next_back().unwrap().key.user_key); + assert_eq!(Slice::from(*b"d"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"i"), iter.next_back().unwrap().key.user_key); + assert_eq!(Slice::from(*b"e"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"h"), iter.next_back().unwrap().key.user_key); + assert_eq!(Slice::from(*b"f"), iter.next().unwrap().key.user_key); + assert_eq!(Slice::from(*b"g"), iter.next_back().unwrap().key.user_key); + } + + Ok(()) + } +} diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 0c5eb6c..1bdbe4b 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -6,8 +6,8 @@ pub mod block; pub mod block_index; pub mod file_offsets; pub mod id; +pub mod level_reader; pub mod meta; -pub mod multi_reader; pub mod multi_writer; pub mod range; pub mod reader; @@ -73,7 +73,11 @@ pub struct Segment { impl std::fmt::Debug for Segment { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Segment:{}", self.metadata.id) + write!( + f, + "Segment:{}({})", + self.metadata.id, self.metadata.key_range + ) } } @@ -195,10 +199,10 @@ impl Segment { io::{Seek, SeekFrom}, }; - assert!(bloom_ptr > 0, "can not find bloom filter block"); + assert!(*bloom_ptr > 0, "can not find bloom filter block"); let mut reader = File::open(file_path)?; - reader.seek(SeekFrom::Start(bloom_ptr))?; + reader.seek(SeekFrom::Start(*bloom_ptr))?; BloomFilter::decode_from(&mut reader)? }, }) diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index d5f9d4b..bc19431 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -50,7 +50,6 @@ impl MultiWriter { let writer = Writer::new(Options { segment_id: current_segment_id, folder: opts.folder.clone(), - evict_tombstones: opts.evict_tombstones, data_block_size: opts.data_block_size, index_block_size: opts.index_block_size, })?; @@ -104,7 +103,6 @@ impl MultiWriter { let mut new_writer = Writer::new(Options { segment_id: new_segment_id, folder: self.opts.folder.clone(), - evict_tombstones: self.opts.evict_tombstones, data_block_size: self.opts.data_block_size, index_block_size: self.opts.index_block_size, })? @@ -117,10 +115,8 @@ impl MultiWriter { let mut old_writer = std::mem::replace(&mut self.writer, new_writer); - if old_writer.meta.item_count > 0 { - // NOTE: if-check checks for item count - self.results - .push(old_writer.finish()?.expect("writer should emit result")); + if let Some(result) = old_writer.finish()? { + self.results.push(result); } Ok(()) diff --git a/src/segment/range.rs b/src/segment/range.rs index c7dc9d3..bcfd893 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -5,6 +5,7 @@ use super::block_index::two_level_index::TwoLevelBlockIndex; use super::id::GlobalSegmentId; use super::reader::Reader; +use super::value_block::BlockOffset; use super::value_block::CachePolicy; use crate::block_cache::BlockCache; use crate::descriptor_table::FileDescriptorTable; @@ -29,7 +30,7 @@ pub struct Range { impl Range { pub fn new( - data_block_boundary: u64, + data_block_boundary: BlockOffset, descriptor_table: Arc, segment_id: GlobalSegmentId, block_cache: Arc, @@ -41,7 +42,7 @@ impl Range { descriptor_table, segment_id, block_cache, - 0, + BlockOffset(0), None, ); @@ -259,7 +260,6 @@ mod tests { let mut writer = Writer::new(Options { segment_id: 0, folder: folder.clone(), - evict_tombstones: false, data_block_size: 1_000, // NOTE: Block size 1 to for each item to be its own block index_block_size: 4_096, })?; @@ -358,7 +358,6 @@ mod tests { let mut writer = Writer::new(Options { segment_id: 0, folder: folder.clone(), - evict_tombstones: false, data_block_size: 4_096, index_block_size: 4_096, })?; @@ -558,7 +557,6 @@ mod tests { let mut writer = Writer::new(Options { segment_id: 0, folder: folder.clone(), - evict_tombstones: false, data_block_size, index_block_size: 4_096, })?; @@ -661,7 +659,6 @@ mod tests { let mut writer = Writer::new(Options { segment_id: 0, folder: folder.clone(), - evict_tombstones: false, data_block_size: 250, index_block_size: 4_096, })?; diff --git a/src/segment/reader.rs b/src/segment/reader.rs index 1032989..beaacbf 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::{ - value_block::{CachePolicy, ValueBlock}, + value_block::{BlockOffset, CachePolicy, ValueBlock}, value_block_consumer::ValueBlockConsumer, }; use crate::{ @@ -17,15 +17,15 @@ pub struct Reader { segment_id: GlobalSegmentId, block_cache: Arc, - data_block_boundary: u64, + data_block_boundary: BlockOffset, - pub lo_block_offset: u64, + pub lo_block_offset: BlockOffset, pub(crate) lo_block_size: u64, pub(crate) lo_block_items: Option, pub(crate) lo_initialized: bool, - pub hi_block_offset: Option, - pub hi_block_backlink: u64, + pub hi_block_offset: Option, + pub hi_block_backlink: BlockOffset, pub hi_block_items: Option, pub hi_initialized: bool, @@ -38,12 +38,12 @@ pub struct Reader { impl Reader { #[must_use] pub fn new( - data_block_boundary: u64, + data_block_boundary: BlockOffset, descriptor_table: Arc, segment_id: GlobalSegmentId, block_cache: Arc, - lo_block_offset: u64, - hi_block_offset: Option, + lo_block_offset: BlockOffset, + hi_block_offset: Option, ) -> Self { Self { data_block_boundary, @@ -58,7 +58,7 @@ impl Reader { lo_initialized: false, hi_block_offset, - hi_block_backlink: 0, + hi_block_backlink: BlockOffset(0), hi_block_items: None, hi_initialized: false, @@ -88,8 +88,8 @@ impl Reader { fn load_data_block( &self, - offset: u64, - ) -> crate::Result> { + offset: BlockOffset, + ) -> crate::Result> { let block = ValueBlock::load_by_block_handle( &self.descriptor_table, &self.block_cache, @@ -151,8 +151,9 @@ impl Iterator for Reader { // Front buffer is empty // Load next block - let next_block_offset = - self.lo_block_offset + Header::serialized_len() as u64 + self.lo_block_size; + let next_block_offset = BlockOffset( + *self.lo_block_offset + Header::serialized_len() as u64 + self.lo_block_size, + ); assert_ne!( self.lo_block_offset, next_block_offset, @@ -219,7 +220,7 @@ impl DoubleEndedIterator for Reader { // Back buffer is empty - if hi_offset == 0 { + if hi_offset == BlockOffset(0) { // We are done return None; } diff --git a/src/segment/trailer.rs b/src/segment/trailer.rs index 9df5349..907a5e3 100644 --- a/src/segment/trailer.rs +++ b/src/segment/trailer.rs @@ -50,7 +50,7 @@ impl SegmentFileTrailer { log::trace!("Trailer offsets: {offsets:#?}"); // Jump to metadata and parse - reader.seek(std::io::SeekFrom::Start(offsets.metadata_ptr))?; + reader.seek(std::io::SeekFrom::Start(*offsets.metadata_ptr))?; let metadata = Metadata::decode_from(&mut reader)?; Ok(Self { metadata, offsets }) @@ -61,6 +61,8 @@ impl Encode for SegmentFileTrailer { fn encode_into(&self, writer: &mut W) -> Result<(), EncodeError> { let mut v = Vec::with_capacity(TRAILER_SIZE); + // TODO: 3.0.0, magic header, too? + self.offsets.encode_into(&mut v)?; // Pad with remaining bytes diff --git a/src/segment/value_block.rs b/src/segment/value_block.rs index 9314111..4883271 100644 --- a/src/segment/value_block.rs +++ b/src/segment/value_block.rs @@ -6,6 +6,35 @@ use super::{block::Block, id::GlobalSegmentId}; use crate::{descriptor_table::FileDescriptorTable, value::InternalValue, BlockCache}; use std::sync::Arc; +#[derive(Copy, Clone, Default, Debug, std::hash::Hash, PartialEq, Eq, Ord, PartialOrd)] +pub struct BlockOffset(pub u64); + +impl std::ops::Deref for BlockOffset { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::AddAssign for BlockOffset { + fn add_assign(&mut self, rhs: Self) { + *self += *rhs; + } +} + +impl std::ops::AddAssign for BlockOffset { + fn add_assign(&mut self, rhs: u64) { + self.0 += rhs; + } +} + +impl std::fmt::Display for BlockOffset { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum CachePolicy { /// Read cached blocks, but do not change cache @@ -36,7 +65,7 @@ impl ValueBlock { descriptor_table: &FileDescriptorTable, block_cache: &BlockCache, segment_id: GlobalSegmentId, - offset: u64, + offset: BlockOffset, cache_policy: CachePolicy, ) -> crate::Result>> { Ok( @@ -115,7 +144,7 @@ mod tests { compression: CompressionType::None, checksum: Checksum::from_raw(0), data_length: 0, - previous_block_offset: 0, + previous_block_offset: BlockOffset(0), uncompressed_length: 0, }, }; diff --git a/src/segment/value_block_consumer.rs b/src/segment/value_block_consumer.rs index 5ae455a..5e70efd 100644 --- a/src/segment/value_block_consumer.rs +++ b/src/segment/value_block_consumer.rs @@ -91,7 +91,10 @@ impl DoubleEndedIterator for ValueBlockConsumer { mod tests { use super::*; use crate::{ - segment::block::{checksum::Checksum, header::Header}, + segment::{ + block::{checksum::Checksum, header::Header}, + value_block::BlockOffset, + }, Slice, }; use test_log::test; @@ -112,7 +115,7 @@ mod tests { compression: crate::segment::meta::CompressionType::None, checksum: Checksum::from_raw(0), data_length: 0, - previous_block_offset: 0, + previous_block_offset: BlockOffset(0), uncompressed_length: 0, }, items: items.into_boxed_slice(), diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index 0887653..a231d35 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -15,7 +15,7 @@ use super::{ use crate::{ coding::Encode, file::fsync_directory, - segment::block::ItemSize, + segment::{block::ItemSize, value_block::BlockOffset}, value::{InternalValue, UserKey}, SegmentId, }; @@ -51,7 +51,7 @@ pub struct Writer { pub(crate) meta: meta::Metadata, /// Stores the previous block position (used for creating back links) - prev_pos: (u64, u64), + prev_pos: (BlockOffset, BlockOffset), current_key: Option, @@ -92,7 +92,6 @@ impl BloomConstructionPolicy { pub struct Options { pub folder: PathBuf, - pub evict_tombstones: bool, pub data_block_size: u32, pub index_block_size: u32, pub segment_id: SegmentId, @@ -108,7 +107,7 @@ impl Writer { let index_writer = IndexWriter::new(opts.index_block_size)?; - let chunk = Vec::with_capacity(10_000); + let chunk = Vec::new(); Ok(Self { opts, @@ -122,7 +121,7 @@ impl Writer { index_writer, chunk, - prev_pos: (0, 0), + prev_pos: (BlockOffset(0), BlockOffset(0)), chunk_size: 0, @@ -160,25 +159,41 @@ impl Writer { return Ok(()); }; - // Write to file let (header, data) = ValueBlock::to_bytes_compressed(&self.chunk, self.prev_pos.0, self.compression)?; self.meta.uncompressed_size += u64::from(header.uncompressed_length); header.encode_into(&mut self.block_writer)?; + + // Write to file self.block_writer.write_all(&data)?; let bytes_written = (BlockHeader::serialized_len() + data.len()) as u64; self.index_writer - .register_block(last.key.user_key.clone(), self.meta.file_pos)?; + .register_block(last.key.user_key.clone(), BlockOffset(self.meta.file_pos))?; // Adjust metadata self.meta.file_pos += bytes_written; self.meta.item_count += self.chunk.len(); self.meta.data_block_count += 1; + self.meta.last_key = Some( + // NOTE: Expect is fine, because the chunk is not empty + // + // Also, we are allowed to remove the last item + // to get ownership of it, because the chunk is cleared after + // this anyway + #[allow(clippy::expect_used)] + self.chunk + .pop() + .expect("chunk should not be empty") + .key + .user_key, + ); + + // Back link stuff self.prev_pos.0 = self.prev_pos.1; self.prev_pos.1 += bytes_written; @@ -196,10 +211,6 @@ impl Writer { /// be non-sense. pub fn write(&mut self, item: InternalValue) -> crate::Result<()> { if item.is_tombstone() { - if self.opts.evict_tombstones { - return Ok(()); - } - self.meta.tombstone_count += 1; } @@ -215,9 +226,12 @@ impl Writer { .push(BloomFilter::get_hash(&item.key.user_key)); } - let item_key = item.key.clone(); let seqno = item.key.seqno; + if self.meta.first_key.is_none() { + self.meta.first_key = Some(item.key.clone().user_key); + } + self.chunk_size += item.size(); self.chunk.push(item); @@ -226,11 +240,6 @@ impl Writer { self.chunk_size = 0; } - if self.meta.first_key.is_none() { - self.meta.first_key = Some(item_key.user_key.clone()); - } - self.meta.last_key = Some(item_key.user_key); - if self.meta.lowest_seqno > seqno { self.meta.lowest_seqno = seqno; } @@ -254,7 +263,7 @@ impl Writer { return Ok(None); } - let index_block_ptr = self.block_writer.stream_position()?; + let index_block_ptr = BlockOffset(self.block_writer.stream_position()?); log::trace!("index_block_ptr={index_block_ptr}"); // Append index blocks to file @@ -282,27 +291,27 @@ impl Writer { filter.encode_into(&mut self.block_writer)?; - bloom_ptr + BlockOffset(bloom_ptr) }; #[cfg(not(feature = "bloom"))] - let bloom_ptr = 0; + let bloom_ptr = BlockOffset(0); log::trace!("bloom_ptr={bloom_ptr}"); // TODO: #46 https://github.com/fjall-rs/lsm-tree/issues/46 - Write range filter - let rf_ptr = 0; + let rf_ptr = BlockOffset(0); log::trace!("rf_ptr={rf_ptr}"); // TODO: #2 https://github.com/fjall-rs/lsm-tree/issues/2 - Write range tombstones - let range_tombstones_ptr = 0; + let range_tombstones_ptr = BlockOffset(0); log::trace!("range_tombstones_ptr={range_tombstones_ptr}"); // TODO: - let pfx_ptr = 0; + let pfx_ptr = BlockOffset(0); log::trace!("pfx_ptr={pfx_ptr}"); // Write metadata - let metadata_ptr = self.block_writer.stream_position()?; + let metadata_ptr = BlockOffset(self.block_writer.stream_position()?); let metadata = Metadata::from_writer(self.opts.segment_id, self)?; metadata.encode_into(&mut self.block_writer)?; @@ -362,7 +371,6 @@ mod tests { let mut writer = Writer::new(Options { folder: folder.clone(), - evict_tombstones: false, data_block_size: 4_096, index_block_size: 4_096, segment_id, @@ -406,7 +414,7 @@ mod tests { table, (0, segment_id).into(), block_cache, - 0, + BlockOffset(0), None, ); @@ -426,7 +434,6 @@ mod tests { let mut writer = Writer::new(Options { folder: folder.clone(), - evict_tombstones: false, data_block_size: 4_096, index_block_size: 4_096, segment_id, @@ -462,7 +469,7 @@ mod tests { table, (0, segment_id).into(), block_cache, - 0, + BlockOffset(0), None, ); diff --git a/src/tree/mod.rs b/src/tree/mod.rs index be0a701..d7f4a52 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -48,6 +48,16 @@ impl std::ops::Deref for Tree { } impl AbstractTree for Tree { + #[cfg(feature = "bloom")] + fn bloom_filter_size(&self) -> usize { + self.levels + .read() + .expect("lock is poisoned") + .iter() + .map(|x| x.bloom_filter.len()) + .sum() + } + fn sealed_memtable_count(&self) -> usize { self.sealed_memtables .read() @@ -133,7 +143,6 @@ impl AbstractTree for Tree { let mut segment_writer = Writer::new(Options { segment_id, folder, - evict_tombstones: false, data_block_size: self.config.data_block_size, index_block_size: self.config.index_block_size, })? @@ -501,10 +510,10 @@ impl Tree { use crate::coding::Decode; use std::io::Seek; - assert!(bloom_ptr > 0, "can not find bloom filter block"); + assert!(*bloom_ptr > 0, "can not find bloom filter block"); let mut reader = std::fs::File::open(&segment_file_path)?; - reader.seek(std::io::SeekFrom::Start(bloom_ptr))?; + reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?; BloomFilter::decode_from(&mut reader)? }, } @@ -608,7 +617,7 @@ impl Tree { fn get_internal_entry_from_segments>( &self, key: K, - evict_tombstone: bool, + evict_tombstone: bool, // TODO: remove?, just always true seqno: Option, ) -> crate::Result> { // NOTE: Create key hash for hash sharing @@ -620,34 +629,39 @@ impl Tree { for level in &level_manifest.levels { // NOTE: Based on benchmarking, binary search is only worth it after ~4 segments - if level.is_disjoint && level.len() >= 5 { - if let Some(segment) = level.get_segment_containing_key(&key) { - #[cfg(not(feature = "bloom"))] - let maybe_item = segment.get(&key, seqno)?; - #[cfg(feature = "bloom")] - let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; - - if let Some(item) = maybe_item { - if evict_tombstone { - return Ok(ignore_tombstone_value(item)); + if level.len() >= 5 { + if let Some(level) = level.as_disjoint() { + if let Some(segment) = level.get_segment_containing_key(&key) { + #[cfg(not(feature = "bloom"))] + let maybe_item = segment.get(&key, seqno)?; + #[cfg(feature = "bloom")] + let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; + + if let Some(item) = maybe_item { + if evict_tombstone { + return Ok(ignore_tombstone_value(item)); + } + return Ok(Some(item)); } - return Ok(Some(item)); + } else { + // NOTE: Don't go to fallback, go to next level instead + continue; } } - } else { - // NOTE: Fallback to linear search - for segment in &level.segments { - #[cfg(not(feature = "bloom"))] - let maybe_item = segment.get(&key, seqno)?; - #[cfg(feature = "bloom")] - let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; - - if let Some(item) = maybe_item { - if evict_tombstone { - return Ok(ignore_tombstone_value(item)); - } - return Ok(Some(item)); + } + + // NOTE: Fallback to linear search + for segment in &level.segments { + #[cfg(not(feature = "bloom"))] + let maybe_item = segment.get(&key, seqno)?; + #[cfg(feature = "bloom")] + let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; + + if let Some(item) = maybe_item { + if evict_tombstone { + return Ok(ignore_tombstone_value(item)); } + return Ok(Some(item)); } } } @@ -659,7 +673,7 @@ impl Tree { pub fn get_internal_entry>( &self, key: K, - evict_tombstone: bool, + evict_tombstone: bool, // TODO: remove?, just always true seqno: Option, ) -> crate::Result> { // TODO: consolidate memtable & sealed behind single RwLock diff --git a/tests/tree_non_disjoint_point_read.rs b/tests/tree_non_disjoint_point_read.rs new file mode 100644 index 0000000..b829da6 --- /dev/null +++ b/tests/tree_non_disjoint_point_read.rs @@ -0,0 +1,55 @@ +use lsm_tree::{AbstractTree, Config}; +use test_log::test; + +#[test] +fn tree_non_disjoint_point_read() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .open()?; + + tree.insert("a", "a", 0); + tree.insert("c", "c", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("d", "d", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("e", "e", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("f", "f", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("g", "g", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("h", "h", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.insert("a", "a", 0); + tree.insert("z", "z", 0); + tree.flush_active_memtable(0)?; + + tree.get("c").unwrap().unwrap(); + tree.get("d").unwrap().unwrap(); + tree.get("e").unwrap().unwrap(); + tree.get("f").unwrap().unwrap(); + tree.get("g").unwrap().unwrap(); + tree.get("h").unwrap().unwrap(); + + Ok(()) +}