diff --git a/protos/table.proto b/protos/table.proto index fe10f53f88..2933a6d093 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -85,6 +85,8 @@ message Manifest { // * 1: deletion files are present // * 2: move_stable_row_ids: row IDs are tracked and stable after move operations // (such as compaction), but not updates. + // * 4: use v2 format (deprecated) + // * 8: table config is present uint64 reader_feature_flags = 9; // Feature flags for writers. @@ -137,6 +139,13 @@ message Manifest { // // This specifies what format is used to store the data files. DataStorageFormat data_format = 15; + + // Table config. + // + // Keys with the prefix "lance." are reserved for the Lance library. Other + // libraries may wish to similarly prefix their configuration keys + // appropriately. + map config = 16; } // Manifest // Auxiliary Data attached to a version. diff --git a/protos/transaction.proto b/protos/transaction.proto index f76653224d..8da320f915 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -61,6 +61,8 @@ message Transaction { repeated lance.file.Field schema = 2; // Schema metadata. map schema_metadata = 3; + // Key-value pairs to merge with existing config. + map config_upsert_values = 4; } // Add or replace a new secondary index. @@ -156,6 +158,12 @@ message Transaction { // The new fragments where updated rows have been moved to. repeated DataFragment new_fragments = 3; } + + // An operation that updates the table config. + message UpdateConfig { + map upsert_values = 1; + repeated string delete_keys = 2; + } // The operation of this transaction. oneof operation { @@ -169,5 +177,6 @@ message Transaction { ReserveFragments reserve_fragments = 107; Update update = 108; Project project = 109; + UpdateConfig update_config = 110; } } \ No newline at end of file diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 396321db8e..31bbf3d544 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -269,7 +269,11 @@ impl Operation { ) -> PyResult { let schema = convert_schema(&schema.0)?; let fragments = into_fragments(fragments); - let op = LanceOperation::Overwrite { fragments, schema }; + let op = LanceOperation::Overwrite { + fragments, + schema, + config_upsert_values: None, + }; Ok(Self(op)) } diff --git a/rust/lance-table/README.md b/rust/lance-table/README.md index da574d1ea0..37a4a6d92d 100644 --- a/rust/lance-table/README.md +++ b/rust/lance-table/README.md @@ -1,6 +1,6 @@ -# lance-file +# lance-table -`lance-file` is an internal sub-crate, containing readers and writers for the +`lance-table` is an internal sub-crate for the [Lance table format](https://lancedb.github.io/lance/format.html). **Important Note**: This crate is **not intended for external usage**. diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index 051f1ace4b..1edeb520d5 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -16,8 +16,10 @@ pub const FLAG_DELETION_FILES: u64 = 1; pub const FLAG_MOVE_STABLE_ROW_IDS: u64 = 2; /// Files are written with the new v2 format (this flag is no longer used) pub const FLAG_USE_V2_FORMAT_DEPRECATED: u64 = 4; +/// Table config is present +pub const FLAG_TABLE_CONFIG: u64 = 8; /// The first bit that is unknown as a feature flag -pub const FLAG_UNKNOWN: u64 = 8; +pub const FLAG_UNKNOWN: u64 = 16; /// Set the reader and writer feature flags in the manifest based on the contents of the manifest. pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) -> Result<()> { @@ -55,6 +57,11 @@ pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) manifest.writer_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS; } + // Test whether any table metadata has been set + if !manifest.config.is_empty() { + manifest.writer_feature_flags |= FLAG_TABLE_CONFIG; + } + Ok(()) } @@ -81,7 +88,9 @@ mod tests { assert!(can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS)); assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); assert!(can_read_dataset( - super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS + super::FLAG_DELETION_FILES + | super::FLAG_MOVE_STABLE_ROW_IDS + | super::FLAG_USE_V2_FORMAT_DEPRECATED )); assert!(!can_read_dataset(super::FLAG_UNKNOWN)); } @@ -91,11 +100,13 @@ mod tests { assert!(can_write_dataset(0)); assert!(can_write_dataset(super::FLAG_DELETION_FILES)); assert!(can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS)); - assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); + assert!(can_write_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); + assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED + | super::FLAG_TABLE_CONFIG )); assert!(!can_write_dataset(super::FLAG_UNKNOWN)); } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 9fd92c3703..22ea6cd309 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; @@ -77,6 +78,9 @@ pub struct Manifest { /// The storage format of the data files. pub data_storage_format: DataStorageFormat, + + /// Table configuration. + pub config: HashMap, } fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec { @@ -99,6 +103,7 @@ impl Manifest { data_storage_format: DataStorageFormat, ) -> Self { let fragment_offsets = compute_fragment_offsets(&fragments); + Self { schema, version: 1, @@ -115,6 +120,7 @@ impl Manifest { fragment_offsets, next_row_id: 0, data_storage_format, + config: HashMap::new(), } } @@ -141,6 +147,7 @@ impl Manifest { fragment_offsets, next_row_id: previous.next_row_id, data_storage_format: previous.data_storage_format.clone(), + config: previous.config.clone(), } } @@ -160,6 +167,17 @@ impl Manifest { self.timestamp_nanos = nanos; } + /// Set the `config` from an iterator + pub fn update_config(&mut self, upsert_values: impl IntoIterator) { + self.config.extend(upsert_values); + } + + /// Delete `config` keys using a slice of keys + pub fn delete_config_keys(&mut self, delete_keys: &[&str]) { + self.config + .retain(|key, _| !delete_keys.contains(&key.as_str())); + } + /// Check the current fragment list and update the high water mark pub fn update_max_fragment_id(&mut self) { let max_fragment_id = self @@ -471,6 +489,7 @@ impl TryFrom for Manifest { fragment_offsets, next_row_id: p.next_row_id, data_storage_format, + config: p.config, }) } } @@ -513,6 +532,7 @@ impl From<&Manifest> for pb::Manifest { file_format: m.data_storage_format.file_format.clone(), version: m.data_storage_format.version.clone(), }), + config: m.config.clone(), } } } @@ -692,4 +712,31 @@ mod tests { assert_eq!(manifest.max_field_id(), 43); } + + #[test] + fn test_config() { + let arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "a", + arrow_schema::DataType::Int64, + false, + )]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let fragments = vec![ + Fragment::with_file_legacy(0, "path1", &schema, Some(10)), + Fragment::with_file_legacy(1, "path2", &schema, Some(15)), + Fragment::with_file_legacy(2, "path3", &schema, Some(20)), + ]; + let mut manifest = Manifest::new(schema, Arc::new(fragments), DataStorageFormat::default()); + + let mut config = HashMap::new(); + config.insert("lance:test".to_string(), "value".to_string()); + config.insert("other-key".to_string(), "other-value".to_string()); + + manifest.update_config(config.clone()); + assert_eq!(manifest.config, config.clone()); + + config.remove("other-key"); + manifest.delete_config_keys(&["other-key"]); + assert_eq!(manifest.config, config); + } } diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index c92eb5bdf8..12196c1573 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -243,6 +243,10 @@ mod test { let arrow_schema = ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]); let schema = Schema::try_from(&arrow_schema).unwrap(); + + let mut config = HashMap::new(); + config.insert("key".to_string(), "value".to_string()); + let mut manifest = Manifest::new(schema, Arc::new(vec![]), DataStorageFormat::default()); let pos = write_manifest(&mut writer, &mut manifest, None) .await diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 97fb94e7c3..fbc8f25c67 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -590,7 +590,11 @@ impl Dataset { .await?; let operation = match params.mode { - WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { schema, fragments }, + WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { + schema, + fragments, + config_upsert_values: None, + }, WriteMode::Append => Operation::Append { fragments }, }; @@ -1505,6 +1509,63 @@ impl Dataset { let stream = Box::new(stream); self.merge_impl(stream, left_on, right_on).await } + + /// Update key-value pairs in config. + pub async fn update_config( + &mut self, + upsert_values: impl IntoIterator, + ) -> Result<()> { + let transaction = Transaction::new( + self.manifest.version, + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(upsert_values)), + delete_keys: None, + }, + None, + ); + + let manifest = commit_transaction( + self, + &self.object_store, + self.commit_handler.as_ref(), + &transaction, + &Default::default(), + &Default::default(), + self.manifest_naming_scheme, + ) + .await?; + + self.manifest = Arc::new(manifest); + + Ok(()) + } + + /// Delete keys from the config. + pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> { + let transaction = Transaction::new( + self.manifest.version, + Operation::UpdateConfig { + upsert_values: None, + delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))), + }, + None, + ); + + let manifest = commit_transaction( + self, + &self.object_store, + self.commit_handler.as_ref(), + &transaction, + &Default::default(), + &Default::default(), + self.manifest_naming_scheme, + ) + .await?; + + self.manifest = Arc::new(manifest); + + Ok(()) + } } #[async_trait::async_trait] @@ -2802,6 +2863,7 @@ mod tests { let operation = Operation::Overwrite { fragments: vec![], schema, + config_upsert_values: None, }; let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -3237,6 +3299,38 @@ mod tests { assert!(fragments[0].metadata.deletion_file.is_some()); } + #[rstest] + #[tokio::test] + async fn test_update_config() { + // Create a table + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::UInt32, + false, + )])); + + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from_iter_values(0..100))], + ); + let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let mut desired_config = HashMap::new(); + desired_config.insert("lance:test".to_string(), "value".to_string()); + desired_config.insert("other-key".to_string(), "other-value".to_string()); + + dataset.update_config(desired_config.clone()).await.unwrap(); + assert_eq!(dataset.manifest.config, desired_config); + + desired_config.remove("other-key"); + dataset.delete_config_keys(&["other-key"]).await.unwrap(); + assert_eq!(dataset.manifest.config, desired_config); + } + #[rstest] #[tokio::test] async fn test_tag( diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 8780834f7f..0640176e4b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2357,6 +2357,7 @@ mod tests { let op = Operation::Overwrite { schema: schema.clone(), fragments, + config_upsert_values: None, }; let registry = Arc::new(ObjectStoreRegistry::default()); @@ -2457,6 +2458,7 @@ mod tests { let op = Operation::Overwrite { fragments: vec![new_fragment], schema: full_schema.clone(), + config_upsert_values: None, }; let registry = Arc::new(ObjectStoreRegistry::default()); diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f4fa80b173..cf5a2e991b 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -22,20 +22,26 @@ //! a conflict. Some operations have additional conditions that must be met for //! them to be compatible. //! -//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | -//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------| -//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | -//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | -//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | -//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | -//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | -//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | +//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | +//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| +//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | +//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | +//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | +//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | +//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | +//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | +//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | +//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | //! //! (1) Delete, update, and rewrite are compatible with each other and themselves only if //! they affect distinct fragments. Otherwise, they conflict. +//! (2) Operations that mutate the config conflict if one of the operations upserts a key +//! that if referenced by another concurrent operation. -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use lance_core::{datatypes::Schema, Error, Result}; use lance_file::{datatypes::Fields, version::LanceFileVersion}; @@ -93,6 +99,7 @@ pub enum Operation { Overwrite { fragments: Vec, schema: Schema, + config_upsert_values: Option>, }, /// A new index has been created. CreateIndex { @@ -139,6 +146,12 @@ pub enum Operation { /// Project to a new schema. This only changes the schema, not the data. Project { schema: Schema }, + + /// Update the dataset configuration. + UpdateConfig { + upsert_values: Option>, + delete_keys: Option>, + }, } #[derive(Debug, Clone)] @@ -165,6 +178,7 @@ impl Operation { | Self::CreateIndex { .. } | Self::ReserveFragments { .. } | Self::Project { .. } + | Self::UpdateConfig { .. } | Self::Restore { .. } => Box::new(std::iter::empty()), Self::Delete { updated_fragments, @@ -195,6 +209,38 @@ impl Operation { } } + /// Returns the config keys that have been upserted by this operation. + fn get_upsert_config_keys(&self) -> Vec { + match self { + Self::Overwrite { + config_upsert_values: Some(upsert_values), + .. + } => { + let vec: Vec = upsert_values.keys().cloned().collect(); + vec + } + Self::UpdateConfig { + upsert_values: Some(uv), + .. + } => { + let vec: Vec = uv.keys().cloned().collect(); + vec + } + _ => Vec::::new(), + } + } + + /// Returns the config keys that have been deleted by this operation. + fn get_delete_config_keys(&self) -> Vec { + match self { + Self::UpdateConfig { + delete_keys: Some(dk), + .. + } => dk.clone(), + _ => Vec::::new(), + } + } + /// Check whether another operation modifies the same fragment IDs as this one. fn modifies_same_ids(&self, other: &Self) -> bool { let self_ids = self.modified_fragment_ids().collect::>(); @@ -202,6 +248,22 @@ impl Operation { other_ids.any(|id| self_ids.contains(&id)) } + /// Check whether another operation upserts a key that is referenced by another operation + fn upsert_key_conflict(&self, other: &Self) -> bool { + let self_upsert_keys = self.get_upsert_config_keys(); + let other_upsert_keys = other.get_upsert_config_keys(); + + let self_delete_keys = self.get_delete_config_keys(); + let other_delete_keys = other.get_delete_config_keys(); + + self_upsert_keys + .iter() + .any(|x| other_upsert_keys.contains(x) || other_delete_keys.contains(x)) + || other_upsert_keys + .iter() + .any(|x| self_upsert_keys.contains(x) || self_delete_keys.contains(x)) + } + pub fn name(&self) -> &str { match self { Self::Append { .. } => "Append", @@ -214,6 +276,7 @@ impl Operation { Self::Restore { .. } => "Restore", Self::Update { .. } => "Update", Self::Project { .. } => "Project", + Self::UpdateConfig { .. } => "UpdateConfig", } } } @@ -245,6 +308,7 @@ impl Transaction { Operation::Delete { .. } | Operation::Update { .. } => false, Operation::ReserveFragments { .. } => false, Operation::Project { .. } => false, + Operation::UpdateConfig { .. } => false, _ => true, }, Operation::Rewrite { .. } => match &other.operation { @@ -259,10 +323,10 @@ impl Transaction { self.operation.modifies_same_ids(&other.operation) } Operation::Project { .. } => false, + Operation::UpdateConfig { .. } => false, _ => true, }, - // Overwrite and Restore always succeed - Operation::Overwrite { .. } => false, + // Restore always succeeds Operation::Restore { .. } => false, // ReserveFragments is compatible with anything that doesn't reset the // max fragment id. @@ -285,6 +349,7 @@ impl Transaction { // TODO: we could be smarter here and only invalidate the index // if the rewrite changed more than X% of row ids. Operation::Rewrite { .. } => true, + Operation::UpdateConfig { .. } => false, _ => true, }, Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { @@ -296,18 +361,30 @@ impl Transaction { } Operation::Project { .. } => false, Operation::Append { .. } => false, + Operation::UpdateConfig { .. } => false, _ => true, }, + Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { + match &other.operation { + Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { + self.operation.upsert_key_conflict(&other.operation) + } + _ => false, + } + } // Merge changes the schema, but preserves row ids, so the only operations - // it's compatible with is CreateIndex and ReserveFragments. + // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. Operation::Merge { .. } => !matches!( &other.operation, - Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + Operation::CreateIndex { .. } + | Operation::ReserveFragments { .. } + | Operation::UpdateConfig { .. } ), Operation::Project { .. } => match &other.operation { // Project is compatible with anything that doesn't change the schema Operation::CreateIndex { .. } => false, Operation::Overwrite { .. } => false, + Operation::UpdateConfig { .. } => false, _ => true, }, } @@ -586,6 +663,7 @@ impl Transaction { Operation::Restore { .. } => { unreachable!() } + Operation::UpdateConfig { .. } => {} }; // If a fragment was reserved then it may not belong at the end of the fragments list. @@ -626,6 +704,33 @@ impl Transaction { manifest.update_max_fragment_id(); + match &self.operation { + Operation::Overwrite { + config_upsert_values: Some(tm), + .. + } => manifest.update_config(tm.clone()), + Operation::UpdateConfig { + upsert_values, + delete_keys, + } => { + // Delete is handled first. If the same key is referenced by upsert and + // delete, then upserted key-value pair will remain. + if let Some(delete_keys) = delete_keys { + manifest.delete_config_keys( + delete_keys + .iter() + .map(|s| s.as_str()) + .collect::>() + .as_slice(), + ) + } + if let Some(upsert_values) = upsert_values { + manifest.update_config(upsert_values.clone()); + } + } + _ => {} + } + if let Operation::ReserveFragments { num_fragments } = self.operation { manifest.max_fragment_id += num_fragments; } @@ -824,13 +929,23 @@ impl TryFrom for Transaction { fragments, schema, schema_metadata: _schema_metadata, // TODO: handle metadata - })) => Operation::Overwrite { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - schema: Schema::from(&Fields(schema.clone())), - }, + config_upsert_values, + })) => { + let config_upsert_option = if config_upsert_values.is_empty() { + Some(config_upsert_values) + } else { + None + }; + + Operation::Overwrite { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + schema: Schema::from(&Fields(schema.clone())), + config_upsert_values: config_upsert_option, + } + } Some(pb::transaction::Operation::ReserveFragments( pb::transaction::ReserveFragments { num_fragments }, )) => Operation::ReserveFragments { num_fragments }, @@ -914,6 +1029,23 @@ impl TryFrom for Transaction { schema: Schema::from(&Fields(schema.clone())), } } + Some(pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { + upsert_values, + delete_keys, + })) => { + let upsert_values = match upsert_values.len() { + 0 => None, + _ => Some(upsert_values), + }; + let delete_keys = match delete_keys.len() { + 0 => None, + _ => Some(delete_keys), + }; + Operation::UpdateConfig { + upsert_values, + delete_keys, + } + } None => { return Err(Error::Internal { message: "Transaction message did not contain an operation".to_string(), @@ -1002,11 +1134,18 @@ impl From<&Transaction> for pb::Transaction { deleted_fragment_ids: deleted_fragment_ids.clone(), predicate: predicate.clone(), }), - Operation::Overwrite { fragments, schema } => { + Operation::Overwrite { + fragments, + schema, + config_upsert_values, + } => { pb::transaction::Operation::Overwrite(pb::transaction::Overwrite { fragments: fragments.iter().map(pb::DataFragment::from).collect(), schema: Fields::from(schema).0, schema_metadata: Default::default(), // TODO: handle metadata + config_upsert_values: config_upsert_values + .clone() + .unwrap_or(Default::default()), }) } Operation::ReserveFragments { num_fragments } => { @@ -1062,6 +1201,13 @@ impl From<&Transaction> for pb::Transaction { schema: Fields::from(schema).0, }) } + Operation::UpdateConfig { + upsert_values, + delete_keys, + } => pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { + upsert_values: upsert_values.clone().unwrap_or(Default::default()), + delete_keys: delete_keys.clone().unwrap_or(Default::default()), + }), }; Self { @@ -1102,7 +1248,14 @@ impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup { /// Validate the operation is valid for the given manifest. pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Result<()> { let manifest = match (manifest, operation) { - (None, Operation::Overwrite { fragments, schema }) => { + ( + None, + Operation::Overwrite { + fragments, + schema, + config_upsert_values: None, + }, + ) => { // Validate here because we are going to return early. schema_fragments_valid(schema, fragments)?; @@ -1128,9 +1281,12 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Operation::Project { schema } => { schema_fragments_valid(schema, manifest.fragments.as_ref()) } - Operation::Merge { fragments, schema } | Operation::Overwrite { fragments, schema } => { - schema_fragments_valid(schema, fragments) - } + Operation::Merge { fragments, schema } + | Operation::Overwrite { + fragments, + schema, + config_upsert_values: None, + } => schema_fragments_valid(schema, fragments), Operation::Update { updated_fragments, new_fragments, @@ -1207,6 +1363,10 @@ mod tests { Operation::Overwrite { fragments: vec![fragment0.clone(), fragment2.clone()], schema: Schema::default(), + config_upsert_values: Some(HashMap::from_iter(vec![( + "overwrite-key".to_string(), + "value".to_string(), + )])), }, Operation::Rewrite { groups: vec![RewriteGroup { @@ -1221,6 +1381,13 @@ mod tests { updated_fragments: vec![fragment0.clone()], new_fragments: vec![fragment2.clone()], }, + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "lance.test".to_string(), + "value".to_string(), + )])), + delete_keys: Some(vec!["remove-key".to_string()]), + }, ]; let other_transactions = other_operations .iter() @@ -1234,7 +1401,7 @@ mod tests { Operation::Append { fragments: vec![fragment0.clone()], }, - [false, false, false, true, true, false, false, false], + [false, false, false, true, true, false, false, false, false], ), ( Operation::Delete { @@ -1243,7 +1410,7 @@ mod tests { deleted_fragment_ids: vec![], predicate: "x > 2".to_string(), }, - [false, false, false, true, true, false, false, true], + [false, false, false, true, true, false, false, true, false], ), ( Operation::Delete { @@ -1252,16 +1419,19 @@ mod tests { deleted_fragment_ids: vec![], predicate: "x > 2".to_string(), }, - [false, false, true, true, true, true, false, true], + [false, false, true, true, true, true, false, true, false], ), ( Operation::Overwrite { fragments: vec![fragment0.clone(), fragment2.clone()], schema: Schema::default(), + config_upsert_values: None, }, // No conflicts: overwrite can always happen since it doesn't // depend on previous state of the table. - [false, false, false, false, false, false, false, false], + [ + false, false, false, false, false, false, false, false, false, + ], ), ( Operation::CreateIndex { @@ -1269,7 +1439,7 @@ mod tests { removed_indices: vec![index0.clone()], }, // Will only conflict with operations that modify row ids. - [false, false, false, false, true, true, false, false], + [false, false, false, false, true, true, false, false, false], ), ( // Rewrite that affects different fragments @@ -1280,7 +1450,7 @@ mod tests { }], rewritten_indices: Vec::new(), }, - [false, true, false, true, true, false, false, true], + [false, true, false, true, true, false, false, true, false], ), ( // Rewrite that affects the same fragments @@ -1291,7 +1461,7 @@ mod tests { }], rewritten_indices: Vec::new(), }, - [false, true, true, true, true, true, false, true], + [false, true, true, true, true, true, false, true, false], ), ( Operation::Merge { @@ -1299,12 +1469,12 @@ mod tests { schema: Schema::default(), }, // Merge conflicts with everything except CreateIndex and ReserveFragments. - [true, false, true, true, true, true, false, true], + [true, false, true, true, true, true, false, true, false], ), ( Operation::ReserveFragments { num_fragments: 2 }, // ReserveFragments only conflicts with Overwrite and Restore. - [false, false, false, false, true, false, false, false], + [false, false, false, false, true, false, false, false, false], ), ( Operation::Update { @@ -1313,7 +1483,60 @@ mod tests { removed_fragment_ids: vec![], new_fragments: vec![fragment2.clone()], }, - [false, false, true, true, true, true, false, true], + [false, false, true, true, true, true, false, true, false], + ), + ( + // Update config that should not conflict with anything + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "other-key".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [ + false, false, false, false, false, false, false, false, false, + ], + ), + ( + // Update config that conflicts with key being upserted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "lance.test".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [false, false, false, false, false, false, false, false, true], + ), + ( + // Update config that conflicts with key being deleted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "remove-key".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [false, false, false, false, false, false, false, false, true], + ), + ( + // Delete config keys currently being deleted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: None, + delete_keys: Some(vec!["remove-key".to_string()]), + }, + [ + false, false, false, false, false, false, false, false, false, + ], + ), + ( + // Delete config keys currently being upserted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: None, + delete_keys: Some(vec!["lance.test".to_string()]), + }, + [false, false, false, false, false, false, false, false, true], ), ]; diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index cb03eceeff..ee9aa844bd 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -916,6 +916,127 @@ mod tests { } } + async fn get_empty_dataset() -> Dataset { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + Dataset::write( + RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()), + test_uri, + None, + ) + .await + .unwrap() + } + + #[tokio::test] + async fn test_good_concurrent_config_writes() { + let dataset = get_empty_dataset().await; + + // Test successful concurrent insert config operations + let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"] + .iter() + .map(|key| { + let mut dataset = dataset.clone(); + tokio::spawn(async move { + dataset + .update_config(vec![(key.to_string(), "value".to_string())]) + .await + }) + }) + .collect(); + let results = join_all(futures).await; + + // Assert all succeeded + for result in results { + assert!(matches!(result, Ok(Ok(_))), "{:?}", result); + } + + let dataset = dataset.checkout_version(6).await.unwrap(); + assert_eq!(dataset.manifest.config.len(), 5); + + dataset.validate().await.unwrap(); + + // Test successful concurrent delete operations. If multiple delete + // operations attempt to delete the same key, they are all successful. + let futures: Vec<_> = ["key1", "key1", "key1", "key2", "key2"] + .iter() + .map(|key| { + let mut dataset = dataset.clone(); + tokio::spawn(async move { dataset.delete_config_keys(&[key]).await }) + }) + .collect(); + let results = join_all(futures).await; + + // Assert all succeeded + for result in results { + assert!(matches!(result, Ok(Ok(_))), "{:?}", result); + } + + let dataset = dataset.checkout_version(11).await.unwrap(); + + // There are now two fewer keys + assert_eq!(dataset.manifest.config.len(), 3); + + dataset.validate().await.unwrap() + } + + #[tokio::test] + async fn test_bad_concurrent_config_writes() { + // If two concurrent insert config operations occur for the same key, a + // `CommitConflict` should be returned + let dataset = get_empty_dataset().await; + + let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"] + .iter() + .map(|key| { + let mut dataset = dataset.clone(); + tokio::spawn(async move { + dataset + .update_config(vec![(key.to_string(), "value".to_string())]) + .await + }) + }) + .collect(); + + let results = join_all(futures).await; + + // Assert that either the first or the second operation fails + let mut first_operation_failed = false; + let error_fragment = "Commit conflict for version"; + for (i, result) in results.into_iter().enumerate() { + match i { + 0 => { + if !matches!(result, Ok(Ok(_))) { + first_operation_failed = true; + assert!(result + .unwrap() + .err() + .unwrap() + .to_string() + .contains(error_fragment)); + } + } + 1 => match first_operation_failed { + true => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), + false => assert!(result + .unwrap() + .err() + .unwrap() + .to_string() + .contains(error_fragment)), + }, + _ => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), + } + } + } + #[test] fn test_fix_schema() { // Manifest has a fragment with no fields in use diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 170d20d0a6..6621798ec0 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -110,7 +110,11 @@ impl TestDatasetGenerator { } } - let operation = Operation::Overwrite { fragments, schema }; + let operation = Operation::Overwrite { + fragments, + schema, + config_upsert_values: None, + }; let registry = Arc::new(ObjectStoreRegistry::default()); Dataset::commit(