Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vecdb fix 2 #378

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/vecdb/vdb_lance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl VecDBHandler {
self.schema.clone(),
);

tracing::info!("vecdb_records_add: adding {} records", records.len());
tracing::info!("adding {} records", records.len());
if let Err(err) = self.data_table.add(
data_batches_iter, Option::from(WriteParams {
mode: WriteMode::Append,
Expand Down Expand Up @@ -181,8 +181,10 @@ impl VecDBHandler {
tracing::error!("Error deleting from vecdb: {:?}", err);
}
}
let cnt = self.data_table.count_deleted_rows().await.unwrap();
tracing::info!("deleted {} records", cnt);

// unfortunately this gives wrong numbers (37 instead of 20), lance is garbage :/
// let cnt = self.data_table.count_deleted_rows().await.unwrap();
// tracing::info!("deleted {} records", cnt);
}
}

Expand Down
23 changes: 15 additions & 8 deletions src/vecdb/vdb_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn vectorize_batch_from_q(
#[allow(non_snake_case)]
B: usize,
) -> Result<(), String> {
let batch = run_actual_model_on_these.drain(..B.min(run_actual_model_on_these.len())).collect::<Vec<_>>();
let batch = run_actual_model_on_these.drain(.. B.min(run_actual_model_on_these.len())).collect::<Vec<_>>();
assert!(batch.len() > 0);

let batch_result = get_embedding_with_retry(
Expand Down Expand Up @@ -210,7 +210,6 @@ async fn vectorize_thread(
let mut reported_unprocessed: usize = 0;
let mut run_actual_model_on_these: Vec<SplitResult> = vec![];
let mut ready_to_vecdb: Vec<VecdbRecord> = vec![];
// let mut delayed_cached_splits_q: Vec<SplitResult> = vec![];

loop {
let (msg_to_me, files_unprocessed, vstatus_changed) = {
Expand Down Expand Up @@ -239,8 +238,12 @@ async fn vectorize_thread(
vstatus_notify.notify_waiters();
}

let flush = ready_to_vecdb.len() > 100 || files_unprocessed == 0 || msg_to_me.is_none();
loop {
if run_actual_model_on_these.len() >= constants.embedding_batch || (!run_actual_model_on_these.is_empty() && files_unprocessed == 0) {
if
run_actual_model_on_these.len() > 0 && flush ||
run_actual_model_on_these.len() >= constants.embedding_batch
{
if let Err(err) = vectorize_batch_from_q(
&mut run_actual_model_on_these,
&mut ready_to_vecdb,
Expand All @@ -259,6 +262,12 @@ async fn vectorize_thread(
}
}

if flush {
assert!(run_actual_model_on_these.len() == 0);
// This function assumes it can delete records with the filenames mentioned, therefore assert above
_send_to_vecdb(vecdb_handler_arc.clone(), &mut ready_to_vecdb).await;
}

if (files_unprocessed + 99).div(100) != (reported_unprocessed + 99).div(100) {
info!("have {} unprocessed files", files_unprocessed);
reported_unprocessed = files_unprocessed;
Expand Down Expand Up @@ -292,7 +301,9 @@ async fn vectorize_thread(
continue;
}
None => {
_send_to_vecdb(vecdb_handler_arc.clone(), &mut ready_to_vecdb).await;
// no more files
assert!(run_actual_model_on_these.is_empty());
assert!(ready_to_vecdb.is_empty());
let reported_vecdb_complete = {
let mut vstatus_locked = vstatus.lock().await;
let done = vstatus_locked.state == "done";
Expand Down Expand Up @@ -365,10 +376,6 @@ async fn vectorize_thread(
vecdb_cache_arc.clone(),
1024,
).await;

if ready_to_vecdb.len() > 100 {
_send_to_vecdb(vecdb_handler_arc.clone(), &mut ready_to_vecdb).await;
}
}
}

Expand Down