Skip to content

Commit

Permalink
feat: expose number of rows covered per delta (#2979)
Browse files Browse the repository at this point in the history
This PR adds a field to `index_stats` where we now return the number of
rows covered by each delta in `num_indexed_rows_per_delta`
  • Loading branch information
chebbyChefNEQ authored Oct 15, 2024
1 parent f98ffdd commit 81554cd
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,26 @@ impl DatasetIndexExt for Dataset {
.collect::<Result<Vec<_>>>()?;

let index_type = indices[0].index_type().to_string();
let unindexed_fragments = self.unindexed_fragments(index_name).await?;
let mut num_unindexed_rows = 0;
for f in unindexed_fragments.iter() {
num_unindexed_rows += f.num_rows().ok_or(Error::Index {
message: format!("fragment {} has no rows", f.id),
location: location!(),
})?;
}
let num_unindexed_fragments = unindexed_fragments.len();
let num_indexed_fragments = self.fragments().len() - num_unindexed_fragments;
let num_indexed_rows = self.count_rows(None).await? - num_unindexed_rows;

let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
let num_indexed_rows_per_delta = self.indexed_fragments(index_name).await?
.iter()
.map(|frags| {
frags.iter().map(|f| f.num_rows().expect("Fragment should have row counts, please upgrade lance and trigger a single right to fix this")).sum::<usize>()
})
.collect::<Vec<_>>();

let num_indexed_fragments = indexed_fragments_per_delta
.clone()
.into_iter()
.flatten()
.map(|f| f.id)
.collect::<HashSet<_>>()
.len();

let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments;
let num_indexed_rows = num_indexed_rows_per_delta.iter().last().unwrap();
let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows;

let stats = json!({
"index_type": index_type,
Expand All @@ -550,6 +559,7 @@ impl DatasetIndexExt for Dataset {
"num_indexed_rows": num_indexed_rows,
"num_unindexed_fragments": num_unindexed_fragments,
"num_unindexed_rows": num_unindexed_rows,
"num_indexed_rows_per_delta": num_indexed_rows_per_delta,
});

serde_json::to_string(&stats).map_err(|e| Error::Index {
Expand All @@ -575,6 +585,9 @@ pub trait DatasetIndexInternalExt: DatasetIndexExt {

/// Return the fragments that are not covered by any of the deltas of the index.
async fn unindexed_fragments(&self, idx_name: &str) -> Result<Vec<Fragment>>;

/// Return the fragments that are covered by each of the deltas of the index.
async fn indexed_fragments(&self, idx_name: &str) -> Result<Vec<Vec<Fragment>>>;
}

#[async_trait]
Expand Down Expand Up @@ -837,6 +850,28 @@ impl DatasetIndexInternalExt for Dataset {
.cloned()
.collect())
}

async fn indexed_fragments(&self, name: &str) -> Result<Vec<Vec<Fragment>>> {
let indices = self.load_indices_by_name(name).await?;
let mut res = vec![];
for idx in indices.iter() {
let mut total_fragment_bitmap = RoaringBitmap::new();
total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index {
message: "Please upgrade lance to 0.8+ to use this function".to_string(),
location: location!(),
})?;

res.push(
self.fragments()
.iter()
.filter(|f| total_fragment_bitmap.contains(f.id as u32))
.cloned()
.collect(),
);
}

Ok(res)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 81554cd

Please sign in to comment.