From 42316f5ae82beb6e511df340f1ddaf18fade1e53 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 15 Aug 2024 16:44:08 -0700 Subject: [PATCH] wip: rearrange traits --- rust/lance-table/src/io/manifest.rs | 30 +- rust/lance-table/src/lib.rs | 1 + rust/lance-table/src/manifest_store.rs | 97 +++++++ .../src/manifest_store/dynamodb.rs | 3 + rust/lance-table/src/manifest_store/legacy.rs | 271 ++++++++++++++++++ .../src/manifest_store/legacy/commit.rs | 227 +++++++++++++++ 6 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 rust/lance-table/src/manifest_store.rs create mode 100644 rust/lance-table/src/manifest_store/dynamodb.rs create mode 100644 rust/lance-table/src/manifest_store/legacy.rs create mode 100644 rust/lance-table/src/manifest_store/legacy/commit.rs diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index c92eb5bdf8..a0b64170c2 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -6,6 +6,7 @@ use std::{ops::Range, sync::Arc}; use async_trait::async_trait; use byteorder::{ByteOrder, LittleEndian}; use bytes::{Bytes, BytesMut}; +use futures::future::BoxFuture; use lance_arrow::DataTypeExt; use lance_file::{version::LanceFileVersion, writer::ManifestProvider}; use object_store::path::Path; @@ -22,7 +23,7 @@ use lance_io::{ utils::read_message, }; -use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC}; +use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION}; /// Read Manifest on URI. /// @@ -116,10 +117,10 @@ pub async fn read_manifest_indexes( async fn do_write_manifest( writer: &mut dyn Writer, manifest: &mut Manifest, - indices: Option>, + indices: &[Index], ) -> Result { // Write indices if presented. - if let Some(indices) = indices.as_ref() { + if !indices.is_empty() { let section = pb::IndexSection { indices: indices.iter().map(|i| i.into()).collect(), }; @@ -134,7 +135,7 @@ async fn do_write_manifest( pub async fn write_manifest( writer: &mut dyn Writer, manifest: &mut Manifest, - indices: Option>, + indices: &[Index], ) -> Result { // Write dictionary values. let max_field_id = manifest.schema.max_field_id().unwrap_or(-1); @@ -187,6 +188,23 @@ pub async fn write_manifest( do_write_manifest(writer, manifest, indices).await } +pub fn write_manifest_file_to_path<'a>( + object_store: &'a ObjectStore, + manifest: &'a mut Manifest, + indices: &'a [Index], + path: &'a Path, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async { + let mut object_writer = ObjectWriter::new(object_store, path).await?; + let pos = write_manifest(&mut object_writer, manifest, indices).await?; + object_writer + .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) + .await?; + object_writer.shutdown().await?; + Ok(()) + }) +} + /// Implementation of ManifestProvider that describes a Lance file by writing /// a manifest that contains nothing but default fields and the schema pub struct ManifestDescribing {} @@ -202,7 +220,7 @@ impl ManifestProvider for ManifestDescribing { Arc::new(vec![]), DataStorageFormat::new(LanceFileVersion::Legacy), ); - let pos = do_write_manifest(object_writer, &mut manifest, None).await?; + let pos = do_write_manifest(object_writer, &mut manifest, &[]).await?; Ok(Some(pos)) } } @@ -244,7 +262,7 @@ mod test { ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]); let schema = Schema::try_from(&arrow_schema).unwrap(); let mut manifest = Manifest::new(schema, Arc::new(vec![]), DataStorageFormat::default()); - let pos = write_manifest(&mut writer, &mut manifest, None) + let pos = write_manifest(&mut writer, &mut manifest, &[]) .await .unwrap(); writer diff --git a/rust/lance-table/src/lib.rs b/rust/lance-table/src/lib.rs index ebe892ba53..4aa0089704 100644 --- a/rust/lance-table/src/lib.rs +++ b/rust/lance-table/src/lib.rs @@ -4,5 +4,6 @@ pub mod feature_flags; pub mod format; pub mod io; +pub mod manifest_store; pub mod rowids; pub mod utils; diff --git a/rust/lance-table/src/manifest_store.rs b/rust/lance-table/src/manifest_store.rs new file mode 100644 index 0000000000..be1554b826 --- /dev/null +++ b/rust/lance-table/src/manifest_store.rs @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! [ManifestStore] encapsulates the logic for reading, writing and managing +//! manifest files. Different implementations may be used, depending on the +//! capabilities of the underlying storage system. + +use futures::future::Future; + +use futures::stream::BoxStream; +use lance_core::{Error, Result}; +use lance_io::traits::Reader; + +use crate::format::{Index, Manifest}; + +pub mod legacy; +#[cfg(feature = "dynamodb")] +pub mod dynamodb; + +const MANIFEST_EXTENSION: &str = "manifest"; + +/// A store of manifests. This provides fast access to the latest version +/// of the dataset and allows for listing and opening older versions. +pub trait ManifestStore { + /// Get the latest version of the dataset. + fn latest_version(&self) -> impl Future>; + + /// Open the latest manifest file. + fn open_latest_manifest(&self) -> impl Future>>; + + /// Open the manifest file for the given version. + /// + /// Should use the provided size if available to avoid an extra HEAD request. + fn open_manifest(&self, version: impl Into) -> impl Future>>; + + /// List all the versions of the dataset. + /// + /// This should return them in descending order. + fn list_versions(&self) -> BoxStream>; + + /// Try to commit the given manifest as the given version. + /// + /// If the version already exists, this should return an error, even if + /// the version was created by a concurrent process. + /// + /// Any temporary files created during the commit should be cleaned up + /// if the commit fails. + /// + /// The `manifest` is mutable because the offsets to certain internal + /// structures are updated during the writing process. + fn try_commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + ) -> impl Future>; + + // TODO: what about cleanup? +} + +pub struct ManifestVersion { + version: u64, + known_size: Option, +} + +impl From for ManifestVersion { + fn from(version: u64) -> Self { + Self { + version, + known_size: None, + } + } +} + + +/// Errors that can occur when committing a manifest. +#[derive(Debug)] +pub enum CommitError { + /// Another transaction has already been written to the path + CommitConflict, + /// Something else went wrong + OtherError(Error), +} + +impl From for CommitError { + fn from(e: Error) -> Self { + Self::OtherError(e) + } +} + +// Goal: make the paths opaque, so that the store implementation can choose how +// the paths are represented. + +// Goal 2: separate idea of commit handler (what happens when we write the manifest) +// from the idea of the store (how we read the manifests). Allow customizing both. + +// This is really just a cleaned up version of CommitHandler. We can provide +// an adapter for now. \ No newline at end of file diff --git a/rust/lance-table/src/manifest_store/dynamodb.rs b/rust/lance-table/src/manifest_store/dynamodb.rs new file mode 100644 index 0000000000..c59b8cf729 --- /dev/null +++ b/rust/lance-table/src/manifest_store/dynamodb.rs @@ -0,0 +1,3 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + diff --git a/rust/lance-table/src/manifest_store/legacy.rs b/rust/lance-table/src/manifest_store/legacy.rs new file mode 100644 index 0000000000..f3a34874e4 --- /dev/null +++ b/rust/lance-table/src/manifest_store/legacy.rs @@ -0,0 +1,271 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +//! # Legacy Manifest Store +//! +//! This module contains the legacy manifest store implementation. +use std::io; + +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use lance_core::{Error, Result}; +use lance_io::{object_store::ObjectStore, traits::Reader}; +use object_store::path::Path; +use snafu::{location, Location}; +use std::{fmt::Debug, fs::DirEntry}; + +use crate::{format::{Index, Manifest}, io::manifest::write_manifest_file_to_path}; + +use super::{CommitError, ManifestStore, ManifestVersion, MANIFEST_EXTENSION}; + +const VERSIONS_DIR: &str = "_versions"; + +mod commit; + +pub use commit::{ + CommitHandler, CommitLease, CommitLock, ManifestWriter, RenameCommitHandler, + UnsafeCommitHandler, +}; + +/// A manifest store that stores manifests in the `_versions` directory and +/// uses the `{version}.manifest` file name format. +/// +/// It uses a pluggable commit handler to allow for different commit strategies. +pub struct LegacyManifestStore<'a> { + base: &'a Path, + object_store: &'a ObjectStore, + commit_handler: &'a dyn CommitHandler, +} + +impl<'a> LegacyManifestStore<'a> { + pub fn new( + base: &'a Path, + object_store: &'a ObjectStore, + commit_handler: &'a dyn CommitHandler, + ) -> Self { + Self { + base, + object_store, + commit_handler, + } + } +} + +impl<'a> ManifestStore for LegacyManifestStore<'a> { + async fn latest_version(&self) -> Result { + let current_path = current_manifest_path(&self.object_store, &self.base).await?; + Ok(current_path.into()) + } + + async fn open_latest_manifest(&self) -> Result> { + let location = current_manifest_path(&self.object_store, &self.base).await?; + // By re-using the size from the list operation, we avoid an extra HEAD request. + if let Some(size) = location.size { + self.object_store + .open_with_size(&location.path, size as usize) + .await + } else { + self.object_store.open(&location.path).await + } + } + + async fn open_manifest(&self, version: impl Into) -> Result> { + let version = version.into(); + + let path = manifest_path(&Path::default(), version.version); + if let Some(size) = version.known_size { + self.object_store.open_with_size(&path, size as usize).await + } else { + self.object_store.open(&path).await + } + } + + fn list_versions(&self) -> BoxStream> { + // Because of lack of order guarantees, this won't be a true stream. + // We have to collect all the versions, and then sort. + let future = async { + let mut versions = self + .object_store + .read_dir_all(&self.base.child(VERSIONS_DIR), None) + .await? + .try_filter_map(|obj_meta| { + if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) { + let version = obj_meta + .location + .filename() + .and_then(|filename| filename.split_once('.')) + .and_then(|(version_str, _)| version_str.parse::().ok()); + if let Some(version) = version { + let version = ManifestVersion { + version, + known_size: Some(obj_meta.size as u64), + }; + futures::future::ready(Ok(Some(version))) + } else { + futures::future::ready(Ok(None)) + } + } else { + futures::future::ready(Ok(None)) + } + }) + .try_collect::>() + .await?; + versions.sort_by_key(|location| location.version); + + Ok(versions.into_iter().rev()) + }; + + futures::stream::once(future) + .flat_map(|res| match res { + Ok(versions) => futures::stream::iter(versions).map(Ok).boxed(), + Err(err) => futures::stream::once(futures::future::ready(Err(err))).boxed(), + }) + .boxed() + } + + async fn try_commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + ) -> std::result::Result<(), CommitError> { + self.commit_handler + .commit( + manifest, + indices, + self.base, + self.object_store, + write_manifest_file_to_path, + ) + .await + } +} + +/// Get the manifest file path for a version. +pub fn manifest_path(base: &Path, version: u64) -> Path { + base.child(VERSIONS_DIR) + .child(format!("{version}.{MANIFEST_EXTENSION}")) +} + +// This is an optimized function that searches for the latest manifest. In +// object_store, list operations lookup metadata for each file listed. This +// method only gets the metadata for the found latest manifest. +fn current_manifest_local(base: &Path) -> std::io::Result> { + let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR)); + let entries = std::fs::read_dir(path)?; + + let mut latest_entry: Option<(u64, DirEntry)> = None; + + for entry in entries { + let entry = entry?; + let filename_raw = entry.file_name(); + let filename = filename_raw.to_string_lossy(); + if !filename.ends_with(MANIFEST_EXTENSION) { + // Need to ignore temporary files, such as + // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d + continue; + } + let Some(version) = filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()) + else { + continue; + }; + + if let Some((latest_version, _)) = &latest_entry { + if version > *latest_version { + latest_entry = Some((version, entry)); + } + } else { + latest_entry = Some((version, entry)); + } + } + + if let Some((version, entry)) = latest_entry { + let path = Path::from_filesystem_path(entry.path()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + Ok(Some(ManifestLocation { + version, + path, + size: Some(entry.metadata()?.len()), + })) + } else { + Ok(None) + } +} + +#[derive(Debug)] +pub struct ManifestLocation { + /// The version the manifest corresponds to. + pub version: u64, + /// Path of the manifest file, relative to the table root. + pub path: Path, + /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. + pub size: Option, +} + +impl From for ManifestVersion { + fn from(location: ManifestLocation) -> Self { + Self { + version: location.version, + known_size: location.size, + } + } +} + +/// Get the latest manifest path +async fn current_manifest_path( + object_store: &ObjectStore, + base: &Path, +) -> Result { + if object_store.is_local() { + if let Ok(Some(location)) = current_manifest_local(base) { + return Ok(location); + } + } + + // We use `list_with_delimiter` to avoid listing the contents of child directories. + let manifest_files = object_store + .inner + .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) + .await?; + + let current = manifest_files + .objects + .into_iter() + .filter(|meta| { + meta.location.filename().is_some() + && meta + .location + .filename() + .unwrap() + .ends_with(MANIFEST_EXTENSION) + }) + .filter_map(|meta| { + let version = meta + .location + .filename() + .unwrap() + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok())?; + Some((version, meta)) + }) + .max_by_key(|(version, _)| *version); + + if let Some((version, meta)) = current { + Ok(ManifestLocation { + version, + path: meta.location, + size: Some(meta.size as u64), + }) + } else { + Err(Error::NotFound { + uri: manifest_path(base, 1).to_string(), + location: location!(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // TODO: tests +} diff --git a/rust/lance-table/src/manifest_store/legacy/commit.rs b/rust/lance-table/src/manifest_store/legacy/commit.rs new file mode 100644 index 0000000000..2b7cb46b77 --- /dev/null +++ b/rust/lance-table/src/manifest_store/legacy/commit.rs @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::{atomic::AtomicBool, Arc}; + +use futures::future::BoxFuture; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +use object_store::Error as ObjectStoreError; +use lance_core::Result; +use crate::{format::{Index, Manifest}, manifest_store::CommitError}; + +use super::manifest_path; + + +/// Function that writes the manifest to the object store. +pub type ManifestWriter = for<'a> fn( + object_store: &'a ObjectStore, + manifest: &'a mut Manifest, + indices: &'a [Index], + path: &'a Path, +) -> BoxFuture<'a, Result<()>>; + +#[async_trait::async_trait] +pub trait CommitHandler: std::fmt::Debug + Send + Sync { + /// Commit a manifest. + /// + /// This function should return an [CommitError::CommitConflict] if another + /// transaction has already been committed to the path. + async fn commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + ) -> std::result::Result<(), CommitError>; +} + +/// Whether we have issued a warning about using the unsafe commit handler. +static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false); + +/// A naive commit implementation that does not prevent conflicting writes. +/// +/// This will log a warning the first time it is used. +pub struct UnsafeCommitHandler; + +#[async_trait::async_trait] +impl CommitHandler for UnsafeCommitHandler { + async fn commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + ) -> std::result::Result<(), CommitError> { + // Log a one-time warning + if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { + WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed); + log::warn!( + "Using unsafe commit handler. Concurrent writes may result in data loss. \ + Consider providing a commit handler that prevents conflicting writes." + ); + } + + let version_path = manifest_path(base_path, manifest.version); + // Write the manifest naively + manifest_writer(object_store, manifest, indices, &version_path).await?; + + Ok(()) + } +} + +impl std::fmt::Debug for UnsafeCommitHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UnsafeCommitHandler").finish() + } +} + +/// A commit implementation that uses a lock to prevent conflicting writes. +#[async_trait::async_trait] +pub trait CommitLock: std::fmt::Debug { + type Lease: CommitLease; + + /// Attempt to lock the table for the given version. + /// + /// If it is already locked by another transaction, wait until it is unlocked. + /// Once it is unlocked, return [CommitError::CommitConflict] if the version + /// has already been committed. Otherwise, return the lock. + /// + /// To prevent poisoned locks, it's recommended to set a timeout on the lock + /// of at least 30 seconds. + /// + /// It is not required that the lock tracks the version. It is provided in + /// case the locking is handled by a catalog service that needs to know the + /// current version of the table. + async fn lock(&self, version: u64) -> std::result::Result; +} + +#[async_trait::async_trait] +pub trait CommitLease: Send + Sync { + /// Return the lease, indicating whether the commit was successful. + async fn release(&self, success: bool) -> std::result::Result<(), CommitError>; +} + +#[async_trait::async_trait] +impl CommitHandler for T { + async fn commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + ) -> std::result::Result<(), CommitError> { + let path = manifest_path(base_path, manifest.version); + // NOTE: once we have the lease we cannot use ? to return errors, since + // we must release the lease before returning. + let lease = self.lock(manifest.version).await?; + + // Head the location and make sure it's not already committed + match object_store.inner.head(&path).await { + Ok(_) => { + // The path already exists, so it's already committed + // Release the lock + lease.release(false).await?; + + return Err(CommitError::CommitConflict); + } + Err(ObjectStoreError::NotFound { .. }) => {} + Err(e) => { + // Something else went wrong + // Release the lock + lease.release(false).await?; + + return Err(CommitError::OtherError(e.into())); + } + } + let res = manifest_writer(object_store, manifest, indices, &path).await; + + // Release the lock + lease.release(res.is_ok()).await?; + + res.map_err(|err| err.into()) + } +} + +#[async_trait::async_trait] +impl CommitHandler for Arc { + async fn commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + ) -> std::result::Result<(), CommitError> { + self.as_ref() + .commit(manifest, indices, base_path, object_store, manifest_writer) + .await + } +} + +/// A commit implementation that uses a temporary path and renames the object. +/// +/// This only works for object stores that support atomic rename if not exist. +pub struct RenameCommitHandler; + +#[async_trait::async_trait] +impl CommitHandler for RenameCommitHandler { + async fn commit( + &self, + manifest: &mut Manifest, + indices: &[Index], + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + ) -> std::result::Result<(), CommitError> { + // Create a temporary object, then use `rename_if_not_exists` to commit. + // If failed, clean up the temporary object. + + let path = manifest_path(base_path, manifest.version); + + // Add .tmp_ prefix to the path + let mut parts: Vec<_> = path.parts().collect(); + // Add a UUID to the end of the filename to avoid conflicts + let uuid = uuid::Uuid::new_v4(); + let new_name = format!( + ".tmp_{}_{}", + parts.last().unwrap().as_ref(), + uuid.as_hyphenated() + ); + let _ = std::mem::replace(parts.last_mut().unwrap(), new_name.into()); + let tmp_path: Path = parts.into_iter().collect(); + + // Write the manifest to the temporary path + manifest_writer(object_store, manifest, indices, &tmp_path).await?; + + let res = match object_store + .inner + .rename_if_not_exists(&tmp_path, &path) + .await + { + Ok(_) => Ok(()), + Err(ObjectStoreError::AlreadyExists { .. }) => { + // Another transaction has already been committed + // Attempt to clean up temporary object, but ignore errors if we can't + let _ = object_store.delete(&tmp_path).await; + + return Err(CommitError::CommitConflict); + } + Err(e) => { + // Something else went wrong + return Err(CommitError::OtherError(e.into())); + } + }; + + res + } +} + +impl std::fmt::Debug for RenameCommitHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RenameCommitHandler").finish() + } +}