Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds support for detached commits #3028

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,7 @@ def commit(
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = False,
) -> LanceDataset:
"""Create a new version of dataset

Expand Down Expand Up @@ -2042,6 +2043,13 @@ def commit(
:meth:`migrate_manifest_paths_v2` method. Default is False. WARNING:
turning this on will make the dataset unreadable for older versions
of Lance (prior to 0.17.0).
detached : bool, optional
If True, then the commit will not be part of the dataset lineage. It will
never show up as the latest dataset and the only way to check it out in the
future will be to specifically check it out by version. The version will be
a random version that is only unique amongst detached commits. The caller
should store this somewhere as there will be no other way to obtain it in
the future.

Returns
-------
Expand Down Expand Up @@ -2079,15 +2087,18 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

_Dataset.commit(
new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
)
return LanceDataset(
base_uri, version=new_ds.version(), storage_options=storage_options
)
return LanceDataset(base_uri, storage_options=storage_options)

def validate(self):
"""
Expand Down
51 changes: 51 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2654,3 +2654,54 @@ def test_default_storage_version(tmp_path: Path):
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION


def test_no_detached_v1(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)

# Make a detached append
table = pa.table({"x": [1]})
frag = lance.LanceFragment.create(dataset.uri, table)
op = lance.LanceOperation.Append([frag])
with pytest.raises(OSError, match="v1 manifest paths"):
dataset.commit(dataset.uri, op, read_version=dataset.version, detached=True)


def test_detached_commits(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path, enable_v2_manifest_paths=True)

# Make a detached append
table = pa.table({"x": [1]})
frag = lance.LanceFragment.create(dataset.uri, table)
op = lance.LanceOperation.Append([frag])
detached = dataset.commit(
dataset.uri, op, read_version=dataset.version, detached=True
)
assert (detached.version & 0x8000000000000000) != 0

assert detached.to_table() == pa.table({"x": [0, 1]})
# Detached commit should not show up in the dataset
dataset = lance.dataset(tmp_path)
assert dataset.to_table() == pa.table({"x": [0]})

# We can make more commits to dataset and they don't affect attached
table = pa.table({"x": [2]})
dataset = lance.write_dataset(table, tmp_path, mode="append")
assert dataset.to_table() == pa.table({"x": [0, 2]})

# We can check out the detached commit
detached = dataset.checkout_version(detached.version)
assert detached.to_table() == pa.table({"x": [0, 1]})

# Detached commit can use detached commit as read version
table = pa.table({"x": [3]})
frag = lance.LanceFragment.create(detached.uri, table)
op = lance.LanceOperation.Append([frag])

detached2 = dataset.commit(
dataset.uri, op, read_version=detached.version, detached=True
)

assert detached2.to_table() == pa.table({"x": [0, 1, 3]})
34 changes: 24 additions & 10 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,7 @@ impl Dataset {
commit_lock: Option<&PyAny>,
storage_options: Option<HashMap<String, String>>,
enable_v2_manifest_paths: Option<bool>,
detached: Option<bool>,
) -> PyResult<Self> {
let object_store_params =
storage_options
Expand Down Expand Up @@ -1337,16 +1338,29 @@ impl Dataset {
let manifest = dataset.as_ref().map(|ds| ds.manifest());
validate_operation(manifest, &operation.0)?;
let object_store_registry = Arc::new(lance::io::ObjectStoreRegistry::default());
LanceDataset::commit(
dataset_uri,
operation.0,
read_version,
object_store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths.unwrap_or(false),
)
.await
if detached.unwrap_or(false) {
LanceDataset::commit_detached(
dataset_uri,
operation.0,
read_version,
object_store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths.unwrap_or(false),
)
.await
} else {
LanceDataset::commit(
dataset_uri,
operation.0,
read_version,
object_store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths.unwrap_or(false),
)
.await
}
})?
.map_err(|e| PyIOError::new_err(e.to_string()))?;
Ok(Self {
Expand Down
5 changes: 4 additions & 1 deletion rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ mod manifest;

pub use fragment::*;
pub use index::Index;
pub use manifest::{DataStorageFormat, Manifest, SelfDescribingFileReader, WriterVersion};
pub use manifest::{
is_detached_version, DataStorageFormat, Manifest, SelfDescribingFileReader, WriterVersion,
DETACHED_VERSION_MASK,
};

use lance_core::{Error, Result};

Expand Down
7 changes: 7 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ pub struct Manifest {
pub config: HashMap<String, String>,
}

// We use the most significant bit to indicate that a transaction is detached
pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;

pub fn is_detached_version(version: u64) -> bool {
version & DETACHED_VERSION_MASK != 0
}

fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
fragments
.iter()
Expand Down
43 changes: 37 additions & 6 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ use {
std::time::{Duration, SystemTime},
};

use crate::format::{Index, Manifest};
use crate::format::{is_detached_version, Index, Manifest};

const VERSIONS_DIR: &str = "_versions";
const MANIFEST_EXTENSION: &str = "manifest";
const DETACHED_VERSION_PREFIX: &str = "d";

/// How manifest files should be named.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -75,18 +76,30 @@ pub enum ManifestNamingScheme {
impl ManifestNamingScheme {
pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
let directory = base.child(VERSIONS_DIR);
match self {
Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
Self::V2 => {
let inverted_version = u64::MAX - version;
directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
if is_detached_version(version) {
// Detached versions should never show up first in a list operation which
// means it needs to come lexicographically after all attached manifest
// files and so we add the prefix `d`. There is no need to invert the
// version number since detached versions are not part of the version
Comment on lines +80 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let directory = base.child(VERSIONS_DIR);
directory.child(format!(
"{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
))
} else {
match self {
Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
Self::V2 => {
let inverted_version = u64::MAX - version;
directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
}
}
}
}

pub fn parse_version(&self, filename: &str) -> Option<u64> {
let file_number = filename
.split_once('.')
// Detached versions will fail the `parse` step, which is ok.
.and_then(|(version_str, _)| version_str.parse::<u64>().ok());
match self {
Self::V1 => file_number,
Expand All @@ -95,6 +108,10 @@ impl ManifestNamingScheme {
}

pub fn detect_scheme(filename: &str) -> Option<Self> {
if filename.starts_with(DETACHED_VERSION_PREFIX) {
// Currently, detached versions must imply V2
return Some(Self::V2);
}
if filename.ends_with(MANIFEST_EXTENSION) {
const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
if filename.len() == V2_LEN {
Expand All @@ -108,6 +125,8 @@ impl ManifestNamingScheme {
}

pub fn detect_scheme_staging(filename: &str) -> Self {
// We shouldn't have to worry about detached versions here since there is no
// such thing as "detached" and "staged" at the same time.
if filename.chars().nth(20) == Some('.') {
Self::V2
} else {
Expand Down Expand Up @@ -462,6 +481,18 @@ async fn default_resolve_version(
version: u64,
object_store: &dyn OSObjectStore,
) -> Result<ManifestLocation> {
if is_detached_version(version) {
return Ok(ManifestLocation {
version,
// Detached versions are not supported with V1 naming scheme. If we need
// to support in the future we could use a different prefix (e.g. 'x' or something)
naming_scheme: ManifestNamingScheme::V2,
// Both V1 and V2 should give the same path for detached versions
path: ManifestNamingScheme::V2.manifest_path(base_path, version),
size: None,
});
}

// try V2, fallback to V1.
let scheme = ManifestNamingScheme::V2;
let path = scheme.manifest_path(base_path, version);
Expand Down
Loading
Loading