From cab1ef01d5e0ae26d452af865a72ba3aac07493d Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Mon, 21 Oct 2024 11:54:01 -0300 Subject: [PATCH] wip --- icechunk/examples/low_level_dataset.rs | 4 +- icechunk/src/change_set.rs | 40 ++- icechunk/src/conflict.rs | 275 ++++++++++++++++++++ icechunk/src/format/mod.rs | 8 +- icechunk/src/format/transaction_log.rs | 52 ++++ icechunk/src/lib.rs | 1 + icechunk/src/repository.rs | 334 ++++++++++++++++++++++++- icechunk/src/storage/caching.rs | 33 ++- icechunk/src/storage/logging.rs | 19 ++ icechunk/src/storage/mod.rs | 12 +- icechunk/src/storage/object_store.rs | 53 +++- icechunk/src/storage/s3.rs | 40 ++- 12 files changed, 850 insertions(+), 21 deletions(-) create mode 100644 icechunk/src/conflict.rs create mode 100644 icechunk/src/format/transaction_log.rs diff --git a/icechunk/examples/low_level_dataset.rs b/icechunk/examples/low_level_dataset.rs index 9e6b9e9c..a7a980d5 100644 --- a/icechunk/examples/low_level_dataset.rs +++ b/icechunk/examples/low_level_dataset.rs @@ -6,7 +6,7 @@ use icechunk::{ ChunkIndices, ChunkKeyEncoding, ChunkPayload, ChunkShape, Codec, DataType, FillValue, Path, StorageTransformer, UserAttributes, ZarrArrayMetadata, }, - storage::{MemCachingStorage, ObjectStorage}, + storage::ObjectStorage, zarr::StoreError, Repository, Storage, }; @@ -30,7 +30,7 @@ let mut ds = Repository::create(Arc::clone(&storage)); let storage: Arc = Arc::new(ObjectStorage::new_in_memory_store(None)); let mut ds = Repository::init( - Arc::new(MemCachingStorage::new(Arc::clone(&storage), 2, 2, 0, 0)), + Repository::add_in_mem_asset_caching(Arc::clone(&storage)), false, ) .await? diff --git a/icechunk/src/change_set.rs b/icechunk/src/change_set.rs index fa958f0b..101b1e98 100644 --- a/icechunk/src/change_set.rs +++ b/icechunk/src/change_set.rs @@ -19,19 +19,23 @@ use crate::{ #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] pub struct ChangeSet { - new_groups: HashMap, - new_arrays: HashMap, - updated_arrays: HashMap, + pub(crate) new_groups: HashMap, + pub(crate) new_arrays: HashMap, + pub(crate) updated_arrays: HashMap, // These paths may point to Arrays or Groups, // since both Groups and Arrays support UserAttributes - updated_attributes: HashMap>, + pub(crate) updated_attributes: HashMap>, // FIXME: issue with too many inline chunks kept in mem - set_chunks: HashMap>>, - deleted_groups: HashSet, - deleted_arrays: HashSet, + pub(crate) set_chunks: HashMap>>, + pub(crate) deleted_groups: HashSet, + pub(crate) deleted_arrays: HashSet, } impl ChangeSet { + pub fn written_arrays(&self) -> impl Iterator { + self.set_chunks.keys() + } + pub fn is_empty(&self) -> bool { self == &ChangeSet::default() } @@ -172,6 +176,12 @@ impl ChangeSet { self.set_chunks.get(&node_id).and_then(|h| h.get(coords)) } + pub fn unset_chunk_ref(&mut self, node_id: NodeId, coord: &ChunkIndices) { + self.set_chunks.entry(node_id).and_modify(|h| { + h.remove(coord); + }); + } + pub fn array_chunks_iterator( &self, node_id: NodeId, @@ -205,8 +215,22 @@ impl ChangeSet { }) } + pub fn all_modified_chunks_iterator( + &self, + ) -> impl Iterator)> + '_ { + self.set_chunks.iter().map(|(node, changes)| (node, changes.keys())) + } + pub fn new_nodes(&self) -> impl Iterator { - self.new_groups.keys().chain(self.new_arrays.keys()) + self.new_groups().chain(self.new_arrays()) + } + + pub fn new_groups(&self) -> impl Iterator { + self.new_groups.keys() + } + + pub fn new_arrays(&self) -> impl Iterator { + self.new_arrays.keys() } pub fn take_chunks( diff --git a/icechunk/src/conflict.rs b/icechunk/src/conflict.rs new file mode 100644 index 00000000..eb4ead19 --- /dev/null +++ b/icechunk/src/conflict.rs @@ -0,0 +1,275 @@ +use std::collections::HashMap; + +use async_trait::async_trait; + +use crate::{ + change_set::ChangeSet, + format::{transaction_log::TransactionLog, NodeId, Path}, + repository::RepositoryResult, + Repository, +}; + +#[derive(Debug, PartialEq, Eq)] +pub enum UnsolvableConflict { + NoFastForwardConfigured, + ChunksWrittenToDeletedArrays(Vec), + WriteToWrittenChunk(HashMap), + ConflictingUserAttributesUpdate(Vec), + ConflictingZarrMetadataUpdate(Vec), + ConflictingGroupCreation(Vec), + ConflictingArrayCreation(Vec), +} + +#[derive(Debug)] +pub enum ConflictResolution { + Patched(ChangeSet), + Failure { reason: Vec, unmodified: ChangeSet }, +} + +#[async_trait] +pub trait ConflictSolver { + async fn solve( + &self, + previous_change: &TransactionLog, + previous_repo: &Repository, + current_changes: ChangeSet, + ) -> RepositoryResult; +} + +pub struct NoFastForward(); + +#[async_trait] +impl ConflictSolver for NoFastForward { + async fn solve( + &self, + _previous_change: &TransactionLog, + _previous_repo: &Repository, + current_changes: ChangeSet, + ) -> RepositoryResult { + Ok(ConflictResolution::Failure { + reason: vec![UnsolvableConflict::NoFastForwardConfigured], + unmodified: current_changes, + }) + } +} + +pub enum VersionSelection { + Fail, + Ours, + Theirs, +} + +pub struct BasicConflictSolver { + pub on_chunk_write_conflicts: VersionSelection, + pub on_user_attributes_conflict: VersionSelection, + pub on_zarr_metadata_conflict: VersionSelection, + pub on_group_creation_conflict: VersionSelection, + pub on_array_creation_conflict: VersionSelection, +} + +impl Default for BasicConflictSolver { + fn default() -> Self { + Self { + on_chunk_write_conflicts: VersionSelection::Fail, + on_user_attributes_conflict: VersionSelection::Fail, + on_zarr_metadata_conflict: VersionSelection::Fail, + on_group_creation_conflict: VersionSelection::Fail, + on_array_creation_conflict: VersionSelection::Fail, + } + } +} + +#[async_trait] +impl ConflictSolver for BasicConflictSolver { + async fn solve( + &self, + previous_change: &TransactionLog, + previous_repo: &Repository, + mut current_changes: ChangeSet, + ) -> RepositoryResult { + let write_chunk_to_deleted_array_conflicts = current_changes + .written_arrays() + .filter_map(|node_id| previous_change.deleted_arrays.get(node_id)) + .cloned() + .collect::>(); + + let mut write_to_written_chunks_conflicts = HashMap::new(); + for (node, indices_written) in current_changes.all_modified_chunks_iterator() { + if let Some(previous_updates) = previous_change.updated_chunks.get(node) { + let conflicts = indices_written + .filter(|idx| previous_updates.contains(idx)) + .cloned() + .collect::>(); + if !conflicts.is_empty() { + let path = find_path(node, previous_repo) + .await? + .expect("Bug in conflict detection"); + write_to_written_chunks_conflicts.insert((*node, path), conflicts); + } + } + } + + let mut both_updated_attributes_conflicts = Vec::new(); + for node in previous_change + .updated_user_attributes + .iter() + .filter(|node| current_changes.has_updated_attributes(node)) + { + let path = + find_path(node, previous_repo).await?.expect("Bug in conflict detection"); + both_updated_attributes_conflicts.push(path); + } + + let mut both_updated_zarr_metadata_conflicts = Vec::new(); + for node in previous_change + .updated_zarr_metadata + .iter() + .filter(|node| current_changes.get_updated_zarr_metadata(**node).is_some()) + { + let path = + find_path(node, previous_repo).await?.expect("Bug in conflict detection"); + both_updated_zarr_metadata_conflicts.push(path); + } + + let both_created_group_conflicts = current_changes + .new_groups() + .filter(|path| previous_change.new_groups.contains_key(path)) + .cloned() + .collect::>(); + + let both_created_array_conflicts = current_changes + .new_arrays() + .filter(|path| previous_change.new_arrays.contains_key(path)) + .cloned() + .collect::>(); + + let mut conflicts = Vec::new(); + + if !write_chunk_to_deleted_array_conflicts.is_empty() { + conflicts.push(UnsolvableConflict::ChunksWrittenToDeletedArrays( + write_chunk_to_deleted_array_conflicts, + )); + } + + let mut delete_our_conflict_chunks = None; + + if !write_to_written_chunks_conflicts.is_empty() { + match self.on_chunk_write_conflicts { + VersionSelection::Fail => { + conflicts.push(UnsolvableConflict::WriteToWrittenChunk( + write_to_written_chunks_conflicts + .into_iter() + .map(|((_node, path), conflicts)| (path, conflicts.len())) + .collect(), + )); + } + VersionSelection::Ours => { + // Nothing to do, our chunks will win + } + VersionSelection::Theirs => { + delete_our_conflict_chunks = Some(write_to_written_chunks_conflicts); + } + } + } + + // let mut delete_our_user_attributes = false; + + // if !both_updated_attributes_conflicts.is_empty() { + // match self.on_user_attributes_conflict { + // VersionSelection::Fail => { + // conflicts.push(UnsolvableConflict::ConflictingUserAttributesUpdate( + // both_updated_attributes_conflicts, + // )); + // } + // VersionSelection::Ours => { + // // Nothing to do, our metadata will win + // } + // VersionSelection::Theirs => { + // delete_our_user_attributes = true; + // } + // } + // } + + // let mut delete_our_updated_zarr_metadata = false; + + // if !both_updated_zarr_metadata_conflicts.is_empty() { + // match self.on_zarr_metadata_conflict { + // VersionSelection::Fail => { + // conflicts.push(UnsolvableConflict::ConflictingZarrMetadataUpdate( + // both_updated_zarr_metadata_conflicts, + // )); + // } + // VersionSelection::Ours => { + // // Nothing to do, our user atts will win + // } + // VersionSelection::Theirs => { + // delete_our_updated_zarr_metadata = true; + // } + // } + // } + + // let mut delete_our_new_groups = false; + + // if !both_created_group_conflicts.is_empty() { + // match self.on_group_creation_conflict { + // VersionSelection::Fail => { + // conflicts.push(UnsolvableConflict::ConflictingGroupCreation( + // both_created_group_conflicts, + // )); + // } + // VersionSelection::Ours => { + // // Nothing to do, our groups will win + // } + // VersionSelection::Theirs => { + // delete_our_new_groups = true; + // } + // } + // } + + // let mut delete_our_new_arrays = false; + + // if !both_created_array_conflicts.is_empty() { + // match self.on_array_creation_conflict { + // VersionSelection::Fail => { + // conflicts.push(UnsolvableConflict::ConflictingArrayCreation( + // both_created_array_conflicts, + // )); + // } + // VersionSelection::Ours => { + // // Nothing to do, our groups will win + // } + // VersionSelection::Theirs => { + // delete_our_new_arrays = true; + // } + // } + // } + + if conflicts.is_empty() { + //fixme + if let Some(chunks) = delete_our_conflict_chunks { + for ((node, path), conflicting_indices) in chunks { + for idx in conflicting_indices.iter() { + current_changes.unset_chunk_ref(node, idx); + } + } + } + + Ok(ConflictResolution::Patched(current_changes)) + } else { + Ok(ConflictResolution::Failure { + reason: conflicts, + unmodified: current_changes, + }) + } + } +} + +async fn find_path(node: &NodeId, repo: &Repository) -> RepositoryResult> { + Ok(repo.list_nodes().await?.find_map(|node_snap| { + if node_snap.id == *node { + Some(node_snap.path) + } else { + None + } + })) +} diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 361fe164..33a26941 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -19,6 +19,7 @@ use crate::{metadata::DataType, private}; pub mod attributes; pub mod manifest; pub mod snapshot; +pub mod transaction_log; #[serde_as] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] @@ -211,7 +212,7 @@ pub enum IcechunkFormatError { pub type IcechunkResult = Result; -type IcechunkFormatVersion = u16; +pub type IcechunkFormatVersion = u16; pub mod format_constants { use super::IcechunkFormatVersion; @@ -223,6 +224,11 @@ pub mod format_constants { pub const LATEST_ICECHUNK_SNAPSHOT_FORMAT: IcechunkFormatVersion = 0; pub const LATEST_ICECHUNK_SNAPSHOT_CONTENT_TYPE: &str = "application/msgpack"; pub const LATEST_ICECHUNK_SNAPSHOT_VERSION_METADATA_KEY: &str = "ic-sna-fmt-ver"; + + pub const LATEST_ICECHUNK_TRANSACTION_LOG_FORMAT: IcechunkFormatVersion = 0; + pub const LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE: &str = "application/msgpack"; + pub const LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY: &str = + "ic-tx-fmt-ver"; } impl Display for Path { diff --git a/icechunk/src/format/transaction_log.rs b/icechunk/src/format/transaction_log.rs new file mode 100644 index 00000000..51a388cb --- /dev/null +++ b/icechunk/src/format/transaction_log.rs @@ -0,0 +1,52 @@ +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; + +use crate::change_set::ChangeSet; + +use super::{format_constants, ChunkIndices, IcechunkFormatVersion, NodeId, Path}; + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct TransactionLog { + // FIXME: better, more stable on-disk format + pub icechunk_transaction_log_format_version: IcechunkFormatVersion, + pub new_groups: HashMap, + pub new_arrays: HashMap, + pub updated_user_attributes: HashSet, + pub updated_zarr_metadata: HashSet, + pub updated_chunks: HashMap>, + pub deleted_paths: HashSet, + pub deleted_groups: HashMap, + pub deleted_arrays: HashMap, +} + +impl From<&ChangeSet> for TransactionLog { + fn from(value: &ChangeSet) -> Self { + let new_groups = value.new_groups.clone(); + let new_arrays = + value.new_arrays.iter().map(|(k, (v, _))| (k.clone(), *v)).collect(); + let updated_user_attributes = value.updated_attributes.keys().copied().collect(); + let updated_zarr_metadata = value.updated_arrays.keys().copied().collect(); + let updated_chunks = value + .set_chunks + .iter() + .map(|(k, v)| (*k, v.keys().cloned().collect())) + .collect(); + let deleted_paths = + value.deleted_arrays.union(&value.deleted_groups).cloned().collect(); + + Self { + icechunk_transaction_log_format_version: + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_FORMAT, + new_groups, + new_arrays, + updated_user_attributes, + updated_zarr_metadata, + updated_chunks, + deleted_paths, + // FIXME: implement + deleted_arrays: HashMap::new(), + deleted_groups: HashMap::new(), + } + } +} diff --git a/icechunk/src/lib.rs b/icechunk/src/lib.rs index e46b48cb..69db6bc8 100644 --- a/icechunk/src/lib.rs +++ b/icechunk/src/lib.rs @@ -18,6 +18,7 @@ //! - The datastructures are represented by concrete types in the [`mod@format`] modules. //! These datastructures use Arrow RecordBatches for representation. pub mod change_set; +pub mod conflict; pub mod format; pub mod metadata; pub mod refs; diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index f44cb7e5..90fba05a 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, iter::{self}, + mem::take, pin::Pin, sync::Arc, }; @@ -18,9 +19,10 @@ pub use crate::{ }, }; use crate::{ + conflict::{ConflictResolution, ConflictSolver, UnsolvableConflict}, format::{ - manifest::VirtualReferenceError, snapshot::ManifestFileInfo, ManifestId, - SnapshotId, + manifest::VirtualReferenceError, snapshot::ManifestFileInfo, + transaction_log::TransactionLog, ManifestId, SnapshotId, }, storage::virtual_ref::{ construct_valid_byte_range, ObjectStoreVirtualChunkResolverConfig, @@ -163,6 +165,8 @@ pub enum RepositoryError { Tag(String), #[error("branch update conflict: `({expected_parent:?}) != ({actual_parent:?})`")] Conflict { expected_parent: Option, actual_parent: Option }, + #[error("cannot rebase snapshot {snapshot} on top of the branch")] + RebaseFailed { snapshot: SnapshotId, conflicts: Vec }, #[error("the repository has been initialized already (default branch exists)")] AlreadyInitialized, #[error("error when handling virtual reference {0}")] @@ -243,7 +247,7 @@ impl Repository { storage: Arc, ) -> Arc { // TODO: allow tuning once we experiment with different configurations - Arc::new(MemCachingStorage::new(storage, 2, 2, 2, 0)) + Arc::new(MemCachingStorage::new(storage, 2, 2, 0, 2, 0)) } fn new( @@ -700,6 +704,63 @@ impl Repository { .await } + pub async fn rebase( + &mut self, + solver: &dyn ConflictSolver, + update_branch_name: &str, + ) -> RepositoryResult<()> { + let ref_data = + fetch_branch_tip(self.storage.as_ref(), update_branch_name).await?; + + if ref_data.snapshot != self.snapshot_id { + let current_snapshot = + self.storage.fetch_snapshot(&ref_data.snapshot).await?; + // FIXME: this should be the whole ancestry not local + let anc = current_snapshot.local_ancestry().map(|meta| meta.id); + let new_commits = iter::once(ref_data.snapshot.clone()) + .chain(anc.take_while(|snap_id| snap_id != &self.snapshot_id)) + .collect::>(); + + // TODO: this clone is expensive + // we currently need it to be able to process commits one by one without modifying the + // changeset in case of failure + let mut changeset = self.change_set.clone(); + + // we need to reverse the iterator to process them in order of oldest first + for snap_id in new_commits.into_iter().rev() { + let tx_log = self.storage.fetch_transaction_log(&snap_id).await?; + let repo = Repository { + config: self.config().clone(), + storage: self.storage.clone(), + snapshot_id: snap_id.clone(), + last_node_id: None, + change_set: ChangeSet::default(), + virtual_resolver: self.virtual_resolver.clone(), + }; + + // TODO: this should probably execute in a worker thread + match solver.solve(&tx_log, &repo, changeset).await? { + ConflictResolution::Patched(patched_changeset) => { + changeset = patched_changeset; + } + ConflictResolution::Failure { reason, .. } => { + return Err(RepositoryError::RebaseFailed { + snapshot: snap_id, + conflicts: reason, + }); + } + } + } + + self.change_set = changeset; + self.snapshot_id = ref_data.snapshot; + Ok(()) + } else { + //FIXME: + todo!() + } + } + pub async fn distributed_commit>( &mut self, update_branch_name: &str, @@ -989,8 +1050,10 @@ async fn distributed_flush>( new_snapshot.metadata.written_at = Utc::now(); let new_snapshot = Arc::new(new_snapshot); + let tx_log = TransactionLog::from(&change_set); let new_snapshot_id = &new_snapshot.metadata.id; storage.write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)).await?; + storage.write_transaction_log(new_snapshot_id.clone(), Arc::new(tx_log)).await?; Ok(new_snapshot_id.clone()) } @@ -1119,6 +1182,7 @@ mod tests { use std::{error::Error, num::NonZeroU64}; use crate::{ + conflict::{BasicConflictSolver, NoFastForward, VersionSelection}, format::manifest::ChunkInfo, metadata::{ ChunkKeyEncoding, ChunkShape, Codec, DataType, FillValue, StorageTransformer, @@ -2198,6 +2262,270 @@ mod tests { Ok(()) } + #[tokio::test()] + async fn test_rebase_without_fast_forward() -> Result<(), Box> { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut repo = Repository::init(Arc::clone(&storage), false).await?.build(); + + repo.add_group("/".try_into().unwrap()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + + let new_array_path: Path = "/array".try_into().unwrap(); + repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?; + repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + + let mut repo1 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + let mut repo2 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + let conflicting_snap = + repo1.commit("main", "write two chunks with repo 1", None).await?; + + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let result = repo2.rebase(&NoFastForward(), "main").await; + assert!(matches!( + result, + Err(RepositoryError::RebaseFailed { snapshot, conflicts, }) if snapshot == conflicting_snap && conflicts == vec![UnsolvableConflict::NoFastForwardConfigured])); + } else { + panic!("Bad test, it should conflict") + } + + Ok(()) + } + + #[tokio::test()] + async fn test_rebase_fast_forwarding_over_chunk_writes() -> Result<(), Box> + { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut repo = Repository::init(Arc::clone(&storage), false).await?.build(); + + repo.add_group("/".try_into().unwrap()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + + let new_array_path: Path = "/array".try_into().unwrap(); + repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?; + let array_created_snap = + repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + + let mut repo1 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + let conflicting_snap = + repo1.commit("main", "write two chunks with repo 1", None).await?; + + // let's try to create a new commit, that conflicts with the previous one but writes to + // different chunks + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![2]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver::default(); + // different chunks were written so this should fast forward + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![2])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("hello".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "Fail" policy (so it shouldn't recover) + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overriden".into())), + ) + .await?; + + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver::default(); + + let res = repo2.rebase(&solver, "main").await; + let expected_conflicts = + [(new_array_path.clone(), 1_usize)].into_iter().collect(); + assert!(matches!( + res, + Err(RepositoryError::RebaseFailed { snapshot, conflicts, }) + if snapshot == conflicting_snap && + conflicts == vec![UnsolvableConflict::WriteToWrittenChunk(expected_conflicts)])); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "Ours" policy + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overriden".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver { + on_chunk_write_conflicts: VersionSelection::Ours, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![1])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("overriden".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "Theirs" policy + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overriden".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver { + on_chunk_write_conflicts: VersionSelection::Theirs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![1])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("hello".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + Ok(()) + } + #[cfg(test)] mod state_machine_test { use crate::format::snapshot::NodeData; diff --git a/icechunk/src/storage/caching.rs b/icechunk/src/storage/caching.rs index 34a2a6d1..b3a93031 100644 --- a/icechunk/src/storage/caching.rs +++ b/icechunk/src/storage/caching.rs @@ -8,7 +8,8 @@ use quick_cache::sync::Cache; use crate::{ format::{ attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, - AttributesId, ByteRange, ChunkId, ManifestId, SnapshotId, + transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, ManifestId, + SnapshotId, }, private, }; @@ -20,6 +21,7 @@ pub struct MemCachingStorage { backend: Arc, snapshot_cache: Cache>, manifest_cache: Cache>, + transactions_cache: Cache>, attributes_cache: Cache>, chunk_cache: Cache<(ChunkId, ByteRange), Bytes>, } @@ -29,6 +31,7 @@ impl MemCachingStorage { backend: Arc, num_snapshots: u16, num_manifests: u16, + num_transactions: u16, num_attributes: u16, num_chunks: u16, ) -> Self { @@ -36,6 +39,7 @@ impl MemCachingStorage { backend, snapshot_cache: Cache::new(num_snapshots as usize), manifest_cache: Cache::new(num_manifests as usize), + transactions_cache: Cache::new(num_transactions as usize), attributes_cache: Cache::new(num_attributes as usize), chunk_cache: Cache::new(num_chunks as usize), } @@ -88,6 +92,20 @@ impl Storage for MemCachingStorage { } } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + match self.transactions_cache.get_value_or_guard_async(id).await { + Ok(log) => Ok(log), + Err(guard) => { + let log = self.backend.fetch_transaction_log(id).await?; + let _fail_is_ok = guard.insert(Arc::clone(&log)); + Ok(log) + } + } + } + async fn fetch_chunk( &self, id: &ChunkId, @@ -134,6 +152,16 @@ impl Storage for MemCachingStorage { Ok(()) } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + self.backend.write_transaction_log(id.clone(), Arc::clone(&log)).await?; + self.transactions_cache.insert(id, log); + Ok(()) + } + async fn write_chunk(&self, id: ChunkId, bytes: Bytes) -> Result<(), StorageError> { self.backend.write_chunk(id.clone(), bytes.clone()).await?; // we don't pre-populate the chunk cache, there are too many of them for this to be useful @@ -202,7 +230,7 @@ mod test { let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend))); let logging_c: Arc = logging.clone(); - let caching = MemCachingStorage::new(Arc::clone(&logging_c), 0, 2, 0, 0); + let caching = MemCachingStorage::new(Arc::clone(&logging_c), 0, 2, 0, 0, 0); let manifest = Arc::new(vec![ci2].into_iter().collect()); let id = ManifestId::random(); @@ -280,6 +308,7 @@ mod test { 2, 0, 0, + 0, ); // we keep asking for all 3 items, but the cache can only fit 2 diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 904010ad..aa97ea35 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -47,6 +47,17 @@ impl Storage for LoggingStorage { self.backend.fetch_snapshot(id).await } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + self.fetch_log + .lock() + .expect("poison lock") + .push(("fetch_transaction_log".to_string(), id.0.to_vec())); + self.backend.fetch_transaction_log(id).await + } + async fn fetch_attributes( &self, id: &AttributesId, @@ -89,6 +100,14 @@ impl Storage for LoggingStorage { self.backend.write_snapshot(id, table).await } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + self.backend.write_transaction_log(id, log).await + } + async fn write_attributes( &self, id: AttributesId, diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index c63f5351..e59630b3 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -30,7 +30,8 @@ pub use object_store::ObjectStorage; use crate::{ format::{ attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, - AttributesId, ByteRange, ChunkId, ManifestId, SnapshotId, + transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, ManifestId, + SnapshotId, }, private, }; @@ -76,6 +77,10 @@ pub trait Storage: fmt::Debug + private::Sealed { ) -> StorageResult>; // FIXME: format flags async fn fetch_manifests(&self, id: &ManifestId) -> StorageResult>; // FIXME: format flags async fn fetch_chunk(&self, id: &ChunkId, range: &ByteRange) -> StorageResult; // FIXME: format flags + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult>; // FIXME: format flags async fn write_snapshot( &self, @@ -93,6 +98,11 @@ pub trait Storage: fmt::Debug + private::Sealed { table: Arc, ) -> StorageResult<()>; async fn write_chunk(&self, id: ChunkId, bytes: Bytes) -> StorageResult<()>; + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()>; async fn get_ref(&self, ref_key: &str) -> StorageResult; async fn ref_names(&self) -> StorageResult>; diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index ee929391..45477cb0 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,8 +1,8 @@ use crate::{ format::{ attributes::AttributesTable, format_constants, manifest::Manifest, - snapshot::Snapshot, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, - ObjectId, SnapshotId, + snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, + ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, }, private, }; @@ -37,6 +37,7 @@ impl From<&ByteRange> for Option { const SNAPSHOT_PREFIX: &str = "snapshots/"; const MANIFEST_PREFIX: &str = "manifests/"; // const ATTRIBUTES_PREFIX: &str = "attributes/"; +const TRANSACTION_PREFIX: &str = "transactions/"; const CHUNK_PREFIX: &str = "chunks/"; const REF_PREFIX: &str = "refs"; @@ -116,6 +117,10 @@ impl ObjectStorage { self.get_path(MANIFEST_PREFIX, id) } + fn get_transaction_path(&self, id: &SnapshotId) -> ObjectPath { + self.get_path(TRANSACTION_PREFIX, id) + } + fn get_chunk_path(&self, id: &ChunkId) -> ObjectPath { self.get_path(CHUNK_PREFIX, id) } @@ -180,6 +185,17 @@ impl Storage for ObjectStorage { Ok(Arc::new(res)) } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + let path = self.get_transaction_path(id); + let bytes = self.store.get(&path).await?.bytes().await?; + // TODO: optimize using from_read + let res = rmp_serde::from_slice(bytes.as_ref())?; + Ok(Arc::new(res)) + } + async fn write_snapshot( &self, id: SnapshotId, @@ -254,6 +270,39 @@ impl Storage for ObjectStorage { Ok(()) } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + let path = self.get_transaction_path(&id); + let bytes = rmp_serde::to_vec(log.as_ref())?; + let attributes = if self.supports_metadata { + Attributes::from_iter(vec![ + ( + Attribute::ContentType, + AttributeValue::from( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE, + ), + ), + ( + Attribute::Metadata(std::borrow::Cow::Borrowed( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY, + )), + AttributeValue::from( + log.icechunk_transaction_log_format_version.to_string(), + ), + ), + ]) + } else { + Attributes::new() + }; + let options = PutOptions { attributes, ..PutOptions::default() }; + // FIXME: use multipart + self.store.put_opts(&path, bytes.into(), options).await?; + Ok(()) + } + async fn fetch_chunk( &self, id: &ChunkId, diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index 543cc00e..b8c0d39e 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -17,8 +17,8 @@ use serde::{Deserialize, Serialize}; use crate::{ format::{ attributes::AttributesTable, format_constants, manifest::Manifest, - snapshot::Snapshot, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, - SnapshotId, + snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, + ChunkId, FileTypeTag, ManifestId, SnapshotId, }, private, zarr::ObjectId, @@ -110,6 +110,7 @@ const SNAPSHOT_PREFIX: &str = "snapshots/"; const MANIFEST_PREFIX: &str = "manifests/"; // const ATTRIBUTES_PREFIX: &str = "attributes/"; const CHUNK_PREFIX: &str = "chunks/"; +const TRANSACTION_PREFIX: &str = "transactions/"; const REF_PREFIX: &str = "refs"; impl S3Storage { @@ -148,6 +149,10 @@ impl S3Storage { self.get_path(CHUNK_PREFIX, id) } + fn get_transaction_path(&self, id: &SnapshotId) -> StorageResult { + self.get_path(TRANSACTION_PREFIX, id) + } + fn ref_key(&self, ref_key: &str) -> StorageResult { let path = PathBuf::from_iter([self.prefix.as_str(), REF_PREFIX, ref_key]); path.into_os_string().into_string().map_err(StorageError::BadPrefix) @@ -243,6 +248,17 @@ impl Storage for S3Storage { Ok(Arc::new(res)) } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + let key = self.get_transaction_path(id)?; + let bytes = self.get_object(key.as_str()).await?; + // TODO: optimize using from_read + let res = rmp_serde::from_slice(bytes.as_ref())?; + Ok(Arc::new(res)) + } + async fn fetch_chunk(&self, id: &ChunkId, range: &ByteRange) -> StorageResult { let key = self.get_chunk_path(id)?; let bytes = self.get_object_range(key.as_str(), range).await?; @@ -297,6 +313,26 @@ impl Storage for S3Storage { .await } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + let key = self.get_transaction_path(&id)?; + let bytes = rmp_serde::to_vec(log.as_ref())?; + let metadata = [( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY, + log.icechunk_transaction_log_format_version.to_string(), + )]; + self.put_object( + key.as_str(), + Some(format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE), + metadata, + bytes, + ) + .await + } + async fn write_chunk( &self, id: ChunkId,