Skip to content

Commit

Permalink
feat: add table config (#2820)
Browse files Browse the repository at this point in the history
Closes #2200 

#2200 reference the concept of "table metadata". This PR uses the name
"config" to avoid potential confusion with other uses of the word
"metadata" throughout the Lance format.

This PR introduces:
- A new `table.proto` field called `config`.
- New `Dataset` methods: `update_config` and `delete_config_keys`.
- `config` field in `Manifest` with public methods for updating and
deleting.
- A new transaction operation `UpdateConfig` along with conflict logic
that returns an error if an operation mutates a key that is being
upserted by another operation.
- A new writer feature flag called `FLAG_TABLE_CONFIG`.
- Unit tests for new `Dataset` methods, concurrent config updaters, and
conflict resolution logic.

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
dsgibbons and wjones127 authored Oct 14, 2024
1 parent 8f95fbe commit 5c8f565
Show file tree
Hide file tree
Showing 12 changed files with 572 additions and 44 deletions.
9 changes: 9 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ message Manifest {
// * 1: deletion files are present
// * 2: move_stable_row_ids: row IDs are tracked and stable after move operations
// (such as compaction), but not updates.
// * 4: use v2 format (deprecated)
// * 8: table config is present
uint64 reader_feature_flags = 9;

// Feature flags for writers.
Expand Down Expand Up @@ -137,6 +139,13 @@ message Manifest {
//
// This specifies what format is used to store the data files.
DataStorageFormat data_format = 15;

// Table config.
//
// Keys with the prefix "lance." are reserved for the Lance library. Other
// libraries may wish to similarly prefix their configuration keys
// appropriately.
map<string, string> config = 16;
} // Manifest

// Auxiliary Data attached to a version.
Expand Down
9 changes: 9 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ message Transaction {
repeated lance.file.Field schema = 2;
// Schema metadata.
map<string, bytes> schema_metadata = 3;
// Key-value pairs to merge with existing config.
map<string, string> config_upsert_values = 4;
}

// Add or replace a new secondary index.
Expand Down Expand Up @@ -156,6 +158,12 @@ message Transaction {
// The new fragments where updated rows have been moved to.
repeated DataFragment new_fragments = 3;
}

// An operation that updates the table config.
message UpdateConfig {
map<string, string> upsert_values = 1;
repeated string delete_keys = 2;
}

// The operation of this transaction.
oneof operation {
Expand All @@ -169,5 +177,6 @@ message Transaction {
ReserveFragments reserve_fragments = 107;
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
}
}
6 changes: 5 additions & 1 deletion python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,11 @@ impl Operation {
) -> PyResult<Self> {
let schema = convert_schema(&schema.0)?;
let fragments = into_fragments(fragments);
let op = LanceOperation::Overwrite { fragments, schema };
let op = LanceOperation::Overwrite {
fragments,
schema,
config_upsert_values: None,
};
Ok(Self(op))
}

Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# lance-file
# lance-table

`lance-file` is an internal sub-crate, containing readers and writers for the
`lance-table` is an internal sub-crate for the
[Lance table format](https://lancedb.github.io/lance/format.html).

**Important Note**: This crate is **not intended for external usage**.
17 changes: 14 additions & 3 deletions rust/lance-table/src/feature_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ pub const FLAG_DELETION_FILES: u64 = 1;
pub const FLAG_MOVE_STABLE_ROW_IDS: u64 = 2;
/// Files are written with the new v2 format (this flag is no longer used)
pub const FLAG_USE_V2_FORMAT_DEPRECATED: u64 = 4;
/// Table config is present
pub const FLAG_TABLE_CONFIG: u64 = 8;
/// The first bit that is unknown as a feature flag
pub const FLAG_UNKNOWN: u64 = 8;
pub const FLAG_UNKNOWN: u64 = 16;

/// Set the reader and writer feature flags in the manifest based on the contents of the manifest.
pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) -> Result<()> {
Expand Down Expand Up @@ -55,6 +57,11 @@ pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool)
manifest.writer_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS;
}

// Test whether any table metadata has been set
if !manifest.config.is_empty() {
manifest.writer_feature_flags |= FLAG_TABLE_CONFIG;
}

Ok(())
}

Expand All @@ -81,7 +88,9 @@ mod tests {
assert!(can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS));
assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED));
assert!(can_read_dataset(
super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS
super::FLAG_DELETION_FILES
| super::FLAG_MOVE_STABLE_ROW_IDS
| super::FLAG_USE_V2_FORMAT_DEPRECATED
));
assert!(!can_read_dataset(super::FLAG_UNKNOWN));
}
Expand All @@ -91,11 +100,13 @@ mod tests {
assert!(can_write_dataset(0));
assert!(can_write_dataset(super::FLAG_DELETION_FILES));
assert!(can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS));
assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED));
assert!(can_write_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED));
assert!(can_write_dataset(super::FLAG_TABLE_CONFIG));
assert!(can_write_dataset(
super::FLAG_DELETION_FILES
| super::FLAG_MOVE_STABLE_ROW_IDS
| super::FLAG_USE_V2_FORMAT_DEPRECATED
| super::FLAG_TABLE_CONFIG
));
assert!(!can_write_dataset(super::FLAG_UNKNOWN));
}
Expand Down
47 changes: 47 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -77,6 +78,9 @@ pub struct Manifest {

/// The storage format of the data files.
pub data_storage_format: DataStorageFormat,

/// Table configuration.
pub config: HashMap<String, String>,
}

fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
Expand All @@ -99,6 +103,7 @@ impl Manifest {
data_storage_format: DataStorageFormat,
) -> Self {
let fragment_offsets = compute_fragment_offsets(&fragments);

Self {
schema,
version: 1,
Expand All @@ -115,6 +120,7 @@ impl Manifest {
fragment_offsets,
next_row_id: 0,
data_storage_format,
config: HashMap::new(),
}
}

Expand All @@ -141,6 +147,7 @@ impl Manifest {
fragment_offsets,
next_row_id: previous.next_row_id,
data_storage_format: previous.data_storage_format.clone(),
config: previous.config.clone(),
}
}

Expand All @@ -160,6 +167,17 @@ impl Manifest {
self.timestamp_nanos = nanos;
}

/// Set the `config` from an iterator
pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
self.config.extend(upsert_values);
}

/// Delete `config` keys using a slice of keys
pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
self.config
.retain(|key, _| !delete_keys.contains(&key.as_str()));
}

/// Check the current fragment list and update the high water mark
pub fn update_max_fragment_id(&mut self) {
let max_fragment_id = self
Expand Down Expand Up @@ -471,6 +489,7 @@ impl TryFrom<pb::Manifest> for Manifest {
fragment_offsets,
next_row_id: p.next_row_id,
data_storage_format,
config: p.config,
})
}
}
Expand Down Expand Up @@ -513,6 +532,7 @@ impl From<&Manifest> for pb::Manifest {
file_format: m.data_storage_format.file_format.clone(),
version: m.data_storage_format.version.clone(),
}),
config: m.config.clone(),
}
}
}
Expand Down Expand Up @@ -692,4 +712,31 @@ mod tests {

assert_eq!(manifest.max_field_id(), 43);
}

#[test]
fn test_config() {
let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
"a",
arrow_schema::DataType::Int64,
false,
)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let fragments = vec![
Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
];
let mut manifest = Manifest::new(schema, Arc::new(fragments), DataStorageFormat::default());

let mut config = HashMap::new();
config.insert("lance:test".to_string(), "value".to_string());
config.insert("other-key".to_string(), "other-value".to_string());

manifest.update_config(config.clone());
assert_eq!(manifest.config, config.clone());

config.remove("other-key");
manifest.delete_config_keys(&["other-key"]);
assert_eq!(manifest.config, config);
}
}
4 changes: 4 additions & 0 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ mod test {
let arrow_schema =
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();

let mut config = HashMap::new();
config.insert("key".to_string(), "value".to_string());

let mut manifest = Manifest::new(schema, Arc::new(vec![]), DataStorageFormat::default());
let pos = write_manifest(&mut writer, &mut manifest, None)
.await
Expand Down
96 changes: 95 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,11 @@ impl Dataset {
.await?;

let operation = match params.mode {
WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { schema, fragments },
WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite {
schema,
fragments,
config_upsert_values: None,
},
WriteMode::Append => Operation::Append { fragments },
};

Expand Down Expand Up @@ -1505,6 +1509,63 @@ impl Dataset {
let stream = Box::new(stream);
self.merge_impl(stream, left_on, right_on).await
}

/// Update key-value pairs in config.
pub async fn update_config(
&mut self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: Some(HashMap::from_iter(upsert_values)),
delete_keys: None,
},
None,
);

let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;

self.manifest = Arc::new(manifest);

Ok(())
}

/// Delete keys from the config.
pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: None,
delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))),
},
None,
);

let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;

self.manifest = Arc::new(manifest);

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -2802,6 +2863,7 @@ mod tests {
let operation = Operation::Overwrite {
fragments: vec![],
schema,
config_upsert_values: None,
};
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
Expand Down Expand Up @@ -3237,6 +3299,38 @@ mod tests {
assert!(fragments[0].metadata.deletion_file.is_some());
}

#[rstest]
#[tokio::test]
async fn test_update_config() {
// Create a table
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));

let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..100))],
);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();

let mut desired_config = HashMap::new();
desired_config.insert("lance:test".to_string(), "value".to_string());
desired_config.insert("other-key".to_string(), "other-value".to_string());

dataset.update_config(desired_config.clone()).await.unwrap();
assert_eq!(dataset.manifest.config, desired_config);

desired_config.remove("other-key");
dataset.delete_config_keys(&["other-key"]).await.unwrap();
assert_eq!(dataset.manifest.config, desired_config);
}

#[rstest]
#[tokio::test]
async fn test_tag(
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,7 @@ mod tests {
let op = Operation::Overwrite {
schema: schema.clone(),
fragments,
config_upsert_values: None,
};

let registry = Arc::new(ObjectStoreRegistry::default());
Expand Down Expand Up @@ -2457,6 +2458,7 @@ mod tests {
let op = Operation::Overwrite {
fragments: vec![new_fragment],
schema: full_schema.clone(),
config_upsert_values: None,
};

let registry = Arc::new(ObjectStoreRegistry::default());
Expand Down
Loading

0 comments on commit 5c8f565

Please sign in to comment.