Skip to content

Commit

Permalink
wip: rearrange traits
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Aug 15, 2024
1 parent b2be254 commit 42316f5
Show file tree
Hide file tree
Showing 6 changed files with 623 additions and 6 deletions.
30 changes: 24 additions & 6 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{ops::Range, sync::Arc};
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_arrow::DataTypeExt;
use lance_file::{version::LanceFileVersion, writer::ManifestProvider};
use object_store::path::Path;
Expand All @@ -22,7 +23,7 @@ use lance_io::{
utils::read_message,

Check warning on line 23 in rust/lance-table/src/io/manifest.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/io/manifest.rs
};

use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC};
use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION};

/// Read Manifest on URI.
///
Expand Down Expand Up @@ -116,10 +117,10 @@ pub async fn read_manifest_indexes(
async fn do_write_manifest(
writer: &mut dyn Writer,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
indices: &[Index],
) -> Result<usize> {
// Write indices if presented.
if let Some(indices) = indices.as_ref() {
if !indices.is_empty() {
let section = pb::IndexSection {
indices: indices.iter().map(|i| i.into()).collect(),
};
Expand All @@ -134,7 +135,7 @@ async fn do_write_manifest(
pub async fn write_manifest(
writer: &mut dyn Writer,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
indices: &[Index],
) -> Result<usize> {
// Write dictionary values.
let max_field_id = manifest.schema.max_field_id().unwrap_or(-1);
Expand Down Expand Up @@ -187,6 +188,23 @@ pub async fn write_manifest(
do_write_manifest(writer, manifest, indices).await
}

pub fn write_manifest_file_to_path<'a>(
object_store: &'a ObjectStore,
manifest: &'a mut Manifest,
indices: &'a [Index],
path: &'a Path,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async {
let mut object_writer = ObjectWriter::new(object_store, path).await?;
let pos = write_manifest(&mut object_writer, manifest, indices).await?;
object_writer
.write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
.await?;
object_writer.shutdown().await?;
Ok(())
})
}

/// Implementation of ManifestProvider that describes a Lance file by writing
/// a manifest that contains nothing but default fields and the schema
pub struct ManifestDescribing {}
Expand All @@ -202,7 +220,7 @@ impl ManifestProvider for ManifestDescribing {
Arc::new(vec![]),
DataStorageFormat::new(LanceFileVersion::Legacy),
);
let pos = do_write_manifest(object_writer, &mut manifest, None).await?;
let pos = do_write_manifest(object_writer, &mut manifest, &[]).await?;
Ok(Some(pos))
}
}
Expand Down Expand Up @@ -244,7 +262,7 @@ mod test {
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(schema, Arc::new(vec![]), DataStorageFormat::default());
let pos = write_manifest(&mut writer, &mut manifest, None)
let pos = write_manifest(&mut writer, &mut manifest, &[])
.await
.unwrap();
writer
Expand Down
1 change: 1 addition & 0 deletions rust/lance-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
pub mod feature_flags;
pub mod format;
pub mod io;
pub mod manifest_store;
pub mod rowids;
pub mod utils;
97 changes: 97 additions & 0 deletions rust/lance-table/src/manifest_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! [ManifestStore] encapsulates the logic for reading, writing and managing
//! manifest files. Different implementations may be used, depending on the
//! capabilities of the underlying storage system.

use futures::future::Future;

use futures::stream::BoxStream;
use lance_core::{Error, Result};
use lance_io::traits::Reader;

Check warning on line 13 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
use crate::format::{Index, Manifest};

pub mod legacy;
#[cfg(feature = "dynamodb")]
pub mod dynamodb;

const MANIFEST_EXTENSION: &str = "manifest";

/// A store of manifests. This provides fast access to the latest version
/// of the dataset and allows for listing and opening older versions.
pub trait ManifestStore {

Check warning on line 24 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
/// Get the latest version of the dataset.
fn latest_version(&self) -> impl Future<Output = Result<ManifestVersion>>;

/// Open the latest manifest file.
fn open_latest_manifest(&self) -> impl Future<Output = Result<Box<dyn Reader>>>;

/// Open the manifest file for the given version.

Check warning on line 31 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
///
/// Should use the provided size if available to avoid an extra HEAD request.
fn open_manifest(&self, version: impl Into<ManifestVersion>) -> impl Future<Output = Result<Box<dyn Reader>>>;

/// List all the versions of the dataset.
///
/// This should return them in descending order.
fn list_versions(&self) -> BoxStream<Result<ManifestVersion>>;

/// Try to commit the given manifest as the given version.
///
/// If the version already exists, this should return an error, even if
/// the version was created by a concurrent process.
///

Check warning on line 45 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
/// Any temporary files created during the commit should be cleaned up
/// if the commit fails.
///
/// The `manifest` is mutable because the offsets to certain internal
/// structures are updated during the writing process.
fn try_commit(
&self,
manifest: &mut Manifest,
indices: &[Index],
) -> impl Future<Output = std::result::Result<(), CommitError>>;

// TODO: what about cleanup?
}

pub struct ManifestVersion {
version: u64,
known_size: Option<u64>,
}

impl From<u64> for ManifestVersion {
fn from(version: u64) -> Self {
Self {
version,
known_size: None,
}
}

Check warning on line 71 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
}


/// Errors that can occur when committing a manifest.
#[derive(Debug)]
pub enum CommitError {
/// Another transaction has already been written to the path
CommitConflict,
/// Something else went wrong
OtherError(Error),
}

impl From<Error> for CommitError {
fn from(e: Error) -> Self {
Self::OtherError(e)
}
}

// Goal: make the paths opaque, so that the store implementation can choose how
// the paths are represented.

// Goal 2: separate idea of commit handler (what happens when we write the manifest)
// from the idea of the store (how we read the manifests). Allow customizing both.

// This is really just a cleaned up version of CommitHandler. We can provide
// an adapter for now.
3 changes: 3 additions & 0 deletions rust/lance-table/src/manifest_store/dynamodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// SPDX-License-Identifier: Apache-2.0

Check warning on line 1 in rust/lance-table/src/manifest_store/dynamodb.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store/dynamodb.rs
// SPDX-FileCopyrightText: Copyright The Lance Authors

Loading

0 comments on commit 42316f5

Please sign in to comment.