diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 34040bfa4f..d468084571 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -529,17 +529,26 @@ impl DatasetIndexExt for Dataset { .collect::>>()?; let index_type = indices[0].index_type().to_string(); - let unindexed_fragments = self.unindexed_fragments(index_name).await?; - let mut num_unindexed_rows = 0; - for f in unindexed_fragments.iter() { - num_unindexed_rows += f.num_rows().ok_or(Error::Index { - message: format!("fragment {} has no rows", f.id), - location: location!(), - })?; - } - let num_unindexed_fragments = unindexed_fragments.len(); - let num_indexed_fragments = self.fragments().len() - num_unindexed_fragments; - let num_indexed_rows = self.count_rows(None).await? - num_unindexed_rows; + + let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; + let num_indexed_rows_per_delta = self.indexed_fragments(index_name).await? + .iter() + .map(|frags| { + frags.iter().map(|f| f.num_rows().expect("Fragment should have row counts, please upgrade lance and trigger a single right to fix this")).sum::() + }) + .collect::>(); + + let num_indexed_fragments = indexed_fragments_per_delta + .clone() + .into_iter() + .flatten() + .map(|f| f.id) + .collect::>() + .len(); + + let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments; + let num_indexed_rows = num_indexed_rows_per_delta.iter().last().unwrap(); + let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows; let stats = json!({ "index_type": index_type, @@ -550,6 +559,7 @@ impl DatasetIndexExt for Dataset { "num_indexed_rows": num_indexed_rows, "num_unindexed_fragments": num_unindexed_fragments, "num_unindexed_rows": num_unindexed_rows, + "num_indexed_rows_per_delta": num_indexed_rows_per_delta, }); serde_json::to_string(&stats).map_err(|e| Error::Index { @@ -575,6 +585,9 @@ pub trait DatasetIndexInternalExt: DatasetIndexExt { /// Return the fragments that are not covered by any of the deltas of the index. async fn unindexed_fragments(&self, idx_name: &str) -> Result>; + + /// Return the fragments that are covered by each of the deltas of the index. + async fn indexed_fragments(&self, idx_name: &str) -> Result>>; } #[async_trait] @@ -837,6 +850,28 @@ impl DatasetIndexInternalExt for Dataset { .cloned() .collect()) } + + async fn indexed_fragments(&self, name: &str) -> Result>> { + let indices = self.load_indices_by_name(name).await?; + let mut res = vec![]; + for idx in indices.iter() { + let mut total_fragment_bitmap = RoaringBitmap::new(); + total_fragment_bitmap |= idx.fragment_bitmap.as_ref().ok_or(Error::Index { + message: "Please upgrade lance to 0.8+ to use this function".to_string(), + location: location!(), + })?; + + res.push( + self.fragments() + .iter() + .filter(|f| total_fragment_bitmap.contains(f.id as u32)) + .cloned() + .collect(), + ); + } + + Ok(res) + } } #[cfg(test)]