Skip to content

Commit

Permalink
Merge pull request #61 from fjall-rs/2.1.0
Browse files Browse the repository at this point in the history
2.1.0
  • Loading branch information
marvin-j97 authored Oct 6, 2024
2 parents 659d444 + aeca336 commit d466a07
Show file tree
Hide file tree
Showing 38 changed files with 1,066 additions and 401 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "lsm-tree"
description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)"
license = "MIT OR Apache-2.0"
version = "2.0.2"
version = "2.1.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand Down Expand Up @@ -34,10 +34,11 @@ lz4_flex = { version = "0.11.3", optional = true }
miniz_oxide = { version = "0.8.0", optional = true }
path-absolutize = "3.1.1"
quick_cache = { version = "0.6.5", default-features = false, features = [] }
rustc-hash = "2.0.0"
self_cell = "1.0.4"
smallvec = { version = "1.13.2" }
tempfile = "3.12.0"
value-log = "1.0.0"
value-log = "1.1.0"
varint-rs = "2.2.0"
xxhash-rust = { version = "0.8.12", features = ["xxh3"] }

Expand Down
57 changes: 52 additions & 5 deletions benches/block.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::{
coding::Encode,
segment::{
block::{header::Header as BlockHeader, ItemSize},
meta::CompressionType,
value_block::ValueBlock,
},
serde::Serializable,
Checksum, InternalValue,
};
use std::io::Write;
Expand Down Expand Up @@ -99,13 +99,56 @@ fn value_block_find(c: &mut Criterion) {
}
}

fn encode_block(c: &mut Criterion) {
let mut group = c.benchmark_group("Encode block");

for comp_type in [
CompressionType::None,
CompressionType::Lz4,
CompressionType::Miniz(3),
] {
for block_size in [1, 4, 8, 16, 32, 64, 128] {
let block_size = block_size * 1_024;

let mut size = 0;

let mut items = vec![];

for x in 0u64.. {
let value = InternalValue::from_components(
x.to_be_bytes(),
x.to_string().repeat(50).as_bytes(),
63,
lsm_tree::ValueType::Value,
);

size += value.size();

items.push(value);

if size >= block_size {
break;
}
}

group.bench_function(format!("{block_size} KiB [{comp_type}]"), |b| {
b.iter(|| {
// Serialize block
let (mut header, data) =
ValueBlock::to_bytes_compressed(&items, 0, comp_type).unwrap();
});
});
}
}
}

fn load_value_block_from_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Load block from disk");

for comp_type in [
CompressionType::None,
//CompressionType::None,
CompressionType::Lz4,
CompressionType::Miniz(6),
//CompressionType::Miniz(3),
] {
for block_size in [1, 4, 8, 16, 32, 64, 128] {
let block_size = block_size * 1_024;
Expand Down Expand Up @@ -133,7 +176,6 @@ fn load_value_block_from_disk(c: &mut Criterion) {

// Serialize block
let (mut header, data) = ValueBlock::to_bytes_compressed(&items, 0, comp_type).unwrap();
header.checksum = Checksum::from_bytes(&data);

let mut file = tempfile::tempfile().unwrap();
header.encode_into(&mut file).unwrap();
Expand All @@ -156,5 +198,10 @@ fn load_value_block_from_disk(c: &mut Criterion) {
}
}

criterion_group!(benches, value_block_find, load_value_block_from_disk,);
criterion_group!(
benches,
encode_block,
value_block_find,
load_value_block_from_disk,
);
criterion_main!(benches);
8 changes: 5 additions & 3 deletions benches/level_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn iterate_segments(c: &mut Criterion) {
group.sample_size(10);

for segment_count in [0, 1, 5, 10, 100, 500, 1_000, 2_000, 4_000] {
group.bench_function(&format!("iterate {segment_count} segments"), |b| {
group.bench_function(format!("iterate {segment_count} segments"), |b| {
let folder = tempfile::tempdir_in(".bench").unwrap();
let tree = Config::new(folder).data_block_size(1_024).open().unwrap();

Expand All @@ -30,7 +30,7 @@ fn find_segment(c: &mut Criterion) {

for segment_count in [1u64, 5, 10, 100, 500, 1_000, 2_000, 4_000] {
group.bench_function(
&format!("find segment in {segment_count} segments - binary search"),
format!("find segment in {segment_count} segments - binary search"),
|b| {
let folder = tempfile::tempdir_in(".bench").unwrap();
let tree = Config::new(folder).data_block_size(1_024).open().unwrap();
Expand All @@ -49,14 +49,16 @@ fn find_segment(c: &mut Criterion) {
.levels
.first()
.expect("should exist")
.as_disjoint()
.expect("should be disjoint")
.get_segment_containing_key(key)
.expect("should exist")
});
},
);

group.bench_function(
&format!("find segment in {segment_count} segments - linear search"),
format!("find segment in {segment_count} segments - linear search"),
|b| {
let folder = tempfile::tempdir().unwrap();
let tree = Config::new(folder).data_block_size(1_024).open().unwrap();
Expand Down
6 changes: 4 additions & 2 deletions benches/tli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::segment::{block_index::BlockIndex, value_block::CachePolicy};
use lsm_tree::segment::{
block_index::BlockIndex, value_block::BlockOffset, value_block::CachePolicy,
};

fn tli_find_item(c: &mut Criterion) {
use lsm_tree::segment::block_index::{
Expand All @@ -15,7 +17,7 @@ fn tli_find_item(c: &mut Criterion) {
for x in 0..item_count {
items.push(KeyedBlockHandle {
end_key: x.to_be_bytes().into(),
offset: x,
offset: BlockOffset(x),
});
}

Expand Down
4 changes: 4 additions & 0 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub type RangeItem = crate::Result<KvPair>;
#[allow(clippy::module_name_repetitions)]
#[enum_dispatch]
pub trait AbstractTree {
/// Gets the memory usage of all bloom filters in the tree.
#[cfg(feature = "bloom")]
fn bloom_filter_size(&self) -> usize;

/* /// Imports data from a flat file (see [`Tree::export`]),
/// blocking the caller until it is done.
///
Expand Down
6 changes: 5 additions & 1 deletion src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl BlobTree {
}

impl AbstractTree for BlobTree {
#[cfg(feature = "bloom")]
fn bloom_filter_size(&self) -> usize {
self.index.bloom_filter_size()
}

fn sealed_memtable_count(&self) -> usize {
self.index.sealed_memtable_count()
}
Expand Down Expand Up @@ -231,7 +236,6 @@ impl AbstractTree for BlobTree {
segment_id,
data_block_size: self.index.config.data_block_size,
index_block_size: self.index.config.index_block_size,
evict_tombstones: false,
folder: lsm_segment_folder,
})?
.use_compression(self.index.config.compression);
Expand Down
40 changes: 24 additions & 16 deletions src/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::either::Either::{self, Left, Right};
use crate::segment::id::GlobalSegmentId;
use crate::segment::value_block::BlockOffset;
use crate::segment::{block_index::IndexBlock, value_block::ValueBlock};
use quick_cache::Weighter;
use quick_cache::{sync::Cache, Equivalent};
Expand All @@ -13,16 +14,16 @@ type Item = Either<Arc<ValueBlock>, Arc<IndexBlock>>;

// (Type (disk or index), Segment ID, Block offset)
#[derive(Eq, std::hash::Hash, PartialEq)]
struct CacheKey(GlobalSegmentId, u64);
struct CacheKey(GlobalSegmentId, BlockOffset);

impl Equivalent<CacheKey> for (GlobalSegmentId, u64) {
impl Equivalent<CacheKey> for (GlobalSegmentId, BlockOffset) {
fn equivalent(&self, key: &CacheKey) -> bool {
self.0 == key.0 && self.1 == key.1
}
}

impl From<(GlobalSegmentId, u64)> for CacheKey {
fn from((gid, bid): (GlobalSegmentId, u64)) -> Self {
impl From<(GlobalSegmentId, BlockOffset)> for CacheKey {
fn from((gid, bid): (GlobalSegmentId, BlockOffset)) -> Self {
Self(gid, bid)
}
}
Expand Down Expand Up @@ -65,7 +66,11 @@ impl Weighter<CacheKey, Item> for BlockWeighter {
/// # Ok::<(), lsm_tree::Error>(())
/// ```
pub struct BlockCache {
data: Cache<CacheKey, Item, BlockWeighter, xxhash_rust::xxh3::Xxh3Builder>,
// NOTE: rustc_hash performed best: https://fjall-rs.github.io/post/fjall-2-1
/// Concurrent cache implementation
data: Cache<CacheKey, Item, BlockWeighter, rustc_hash::FxBuildHasher>,

/// Capacity in bytes
capacity: u64,
}

Expand All @@ -75,14 +80,17 @@ impl BlockCache {
pub fn with_capacity_bytes(bytes: u64) -> Self {
use quick_cache::sync::DefaultLifecycle;

#[allow(clippy::default_trait_access)]
let quick_cache = Cache::with(
1_000_000,
bytes,
BlockWeighter,
Default::default(),
DefaultLifecycle::default(),
);

Self {
data: Cache::with(
1_000_000,
bytes,
BlockWeighter,
xxhash_rust::xxh3::Xxh3Builder::new(),
DefaultLifecycle::default(),
),
data: quick_cache,
capacity: bytes,
}
}
Expand Down Expand Up @@ -115,7 +123,7 @@ impl BlockCache {
pub fn insert_disk_block(
&self,
segment_id: GlobalSegmentId,
offset: u64,
offset: BlockOffset,
value: Arc<ValueBlock>,
) {
if self.capacity > 0 {
Expand All @@ -127,7 +135,7 @@ impl BlockCache {
pub fn insert_index_block(
&self,
segment_id: GlobalSegmentId,
offset: u64,
offset: BlockOffset,
value: Arc<IndexBlock>,
) {
if self.capacity > 0 {
Expand All @@ -140,7 +148,7 @@ impl BlockCache {
pub fn get_disk_block(
&self,
segment_id: GlobalSegmentId,
offset: u64,
offset: BlockOffset,
) -> Option<Arc<ValueBlock>> {
let key = (segment_id, offset);
let item = self.data.get(&key)?;
Expand All @@ -152,7 +160,7 @@ impl BlockCache {
pub fn get_index_block(
&self,
segment_id: GlobalSegmentId,
offset: u64,
offset: BlockOffset,
) -> Option<Arc<IndexBlock>> {
let key = (segment_id, offset);
let item = self.data.get(&key)?;
Expand Down
Loading

0 comments on commit d466a07

Please sign in to comment.