Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: BubbleCal <[email protected]>
  • Loading branch information
BubbleCal committed Oct 17, 2024
1 parent fe5fcf3 commit 76795c8
Showing 1 changed file with 29 additions and 21 deletions.
50 changes: 29 additions & 21 deletions rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,31 @@ use tracing::instrument;
use super::index::*;

lazy_static! {
static ref DOC_CHUNK_SIZE: usize = std::env::var("DOC_CHUNK_SIZE")
.unwrap_or_else(|_| "2048".to_string())
.parse()
.expect("failed to parse DOC_CHUNK_SIZE");
static ref FLUSH_THRESHOLD: usize = std::env::var("FLUSH_THRESHOLD")
// the size threshold to trigger flush the posting lists while indexing FTS,
// lower value will result in slower indexing and less memory usage
// it's in 256MiB by default
static ref LANCE_FTS_FLUSH_THRESHOLD: usize = std::env::var("LANCE_FTS_FLUSH_THRESHOLD")
.unwrap_or_else(|_| "256".to_string())
.parse()
.expect("failed to parse FLUSH_THRESHOLD");
static ref FLUSH_SIZE: usize = std::env::var("FLUSH_SIZE")
.expect("failed to parse LANCE_FTS_FLUSH_THRESHOLD");
// the size of each flush, lower value will result in more frequent flushes, but better IO locality
// when the `LANCE_FTS_FLUSH_THRESHOLD` is reached, the flush will be triggered,
// and then flush posting lists until the size of the flushed posting lists reaches `LANCE_FTS_FLUSH_SIZE`
// it's in 64MiB by default
static ref LANCE_FTS_FLUSH_SIZE: usize = std::env::var("LANCE_FTS_FLUSH_SIZE")
.unwrap_or_else(|_| "64".to_string())
.parse()
.expect("failed to parse FLUSH_SIZE");
static ref NUM_SHARDS: usize = std::env::var("NUM_SHARDS")
.unwrap_or_else(|_| "8".to_string())
.parse()
.expect("failed to parse NUM_SHARDS");
static ref CHANNEL_SIZE: usize = std::env::var("CHANNEL_SIZE")
.expect("failed to parse LANCE_FTS_FLUSH_SIZE");
// the number of shards to split the indexing work,
// the indexing process would spawn `LANCE_FTS_NUM_SHARDS` workers to build FTS,
// higher value will result in better parallelism, but more memory usage,
// it doesn't mean higher value will result in better performance,
// because the bottleneck can be the IO once the number of shards is large enough,
// it's 8 by default
static ref LANCE_FTS_NUM_SHARDS: usize = std::env::var("LANCE_FTS_NUM_SHARDS")
.unwrap_or_else(|_| "8".to_string())
.parse()
.expect("failed to parse CHANNEL_SIZE");
.expect("failed to parse LANCE_FTS_NUM_SHARDS");
}

#[derive(Debug, Default, DeepSizeOf)]
Expand Down Expand Up @@ -105,18 +110,21 @@ impl InvertedIndexBuilder {

#[instrument(level = "debug", skip_all)]
async fn update_index(&mut self, stream: SendableRecordBatchStream) -> Result<()> {
let num_shards = *NUM_SHARDS;
let num_shards = *LANCE_FTS_NUM_SHARDS;

// init the token maps
let mut token_maps = (0..num_shards).map(|_| HashMap::new()).collect_vec();
let mut token_maps = vec![HashMap::new(); num_shards];
for (token, token_id) in self.tokens.tokens.iter() {
let mut hasher = DefaultHasher::new();
hasher.write(token.as_bytes());
let shard = hasher.finish() as usize % num_shards;
token_maps[shard].insert(token.clone(), *token_id);
}

// spawn workers to build the index
// spawn `num_shards` workers to build the index,
// this thread will consume the stream and send the tokens to the workers,
// because the workers can be CPU intensive, we need to limit concurrency the consuming stream,
// so it's `num_cpus - num_shards`, and higher than `num_shards` may not help much, so we also limit it to `num_shards`
let buffer_size = get_num_compute_intensive_cpus()
.saturating_sub(num_shards)
.max(1)
Expand All @@ -127,7 +135,7 @@ impl InvertedIndexBuilder {
let inverted_list = self.inverted_list.clone();
let mut worker = IndexWorker::new(token_map, self.params.with_position).await?;

let (sender, mut receiver) = tokio::sync::mpsc::channel(*CHANNEL_SIZE);
let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
senders.push(sender);
result_futs.push(tokio::spawn({
async move {
Expand Down Expand Up @@ -448,7 +456,7 @@ impl IndexWorker {
self.estimated_size += new_size - old_size;
});

if self.estimated_size > *FLUSH_THRESHOLD * 1024 * 1024 {
if self.estimated_size > *LANCE_FTS_FLUSH_THRESHOLD * 1024 * 1024 {
self.flush(false).await?;
}

Expand Down Expand Up @@ -476,7 +484,7 @@ impl IndexWorker {
for key in keys {
flushed_size += self.flush_posting_list(key).await?;
count += 1;
if !flush_all && flushed_size >= *FLUSH_SIZE * 1024 * 1024 {
if !flush_all && flushed_size >= *LANCE_FTS_FLUSH_SIZE * 1024 * 1024 {
break;
}
}
Expand Down Expand Up @@ -639,7 +647,7 @@ impl PostingReader {
});

let stream = stream::iter(inverted_batches)
.buffer_unordered(get_num_compute_intensive_cpus().div_ceil(*NUM_SHARDS));
.buffer_unordered(get_num_compute_intensive_cpus().div_ceil(*LANCE_FTS_NUM_SHARDS));
Ok(Box::pin(stream))
}
}
Expand Down

0 comments on commit 76795c8

Please sign in to comment.