Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Oct 22, 2024
1 parent aa7f8ac commit cab1ef0
Show file tree
Hide file tree
Showing 12 changed files with 850 additions and 21 deletions.
4 changes: 2 additions & 2 deletions icechunk/examples/low_level_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use icechunk::{
ChunkIndices, ChunkKeyEncoding, ChunkPayload, ChunkShape, Codec, DataType,
FillValue, Path, StorageTransformer, UserAttributes, ZarrArrayMetadata,
},
storage::{MemCachingStorage, ObjectStorage},
storage::ObjectStorage,
zarr::StoreError,
Repository, Storage,
};
Expand All @@ -30,7 +30,7 @@ let mut ds = Repository::create(Arc::clone(&storage));
let storage: Arc<dyn Storage + Send + Sync> =
Arc::new(ObjectStorage::new_in_memory_store(None));
let mut ds = Repository::init(
Arc::new(MemCachingStorage::new(Arc::clone(&storage), 2, 2, 0, 0)),
Repository::add_in_mem_asset_caching(Arc::clone(&storage)),
false,
)
.await?
Expand Down
40 changes: 32 additions & 8 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ use crate::{

#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct ChangeSet {
new_groups: HashMap<Path, NodeId>,
new_arrays: HashMap<Path, (NodeId, ZarrArrayMetadata)>,
updated_arrays: HashMap<NodeId, ZarrArrayMetadata>,
pub(crate) new_groups: HashMap<Path, NodeId>,
pub(crate) new_arrays: HashMap<Path, (NodeId, ZarrArrayMetadata)>,
pub(crate) updated_arrays: HashMap<NodeId, ZarrArrayMetadata>,
// These paths may point to Arrays or Groups,
// since both Groups and Arrays support UserAttributes
updated_attributes: HashMap<NodeId, Option<UserAttributes>>,
pub(crate) updated_attributes: HashMap<NodeId, Option<UserAttributes>>,
// FIXME: issue with too many inline chunks kept in mem
set_chunks: HashMap<NodeId, HashMap<ChunkIndices, Option<ChunkPayload>>>,
deleted_groups: HashSet<Path>,
deleted_arrays: HashSet<Path>,
pub(crate) set_chunks: HashMap<NodeId, HashMap<ChunkIndices, Option<ChunkPayload>>>,
pub(crate) deleted_groups: HashSet<Path>,
pub(crate) deleted_arrays: HashSet<Path>,
}

impl ChangeSet {
pub fn written_arrays(&self) -> impl Iterator<Item = &NodeId> {
self.set_chunks.keys()
}

pub fn is_empty(&self) -> bool {
self == &ChangeSet::default()
}
Expand Down Expand Up @@ -172,6 +176,12 @@ impl ChangeSet {
self.set_chunks.get(&node_id).and_then(|h| h.get(coords))
}

pub fn unset_chunk_ref(&mut self, node_id: NodeId, coord: &ChunkIndices) {
self.set_chunks.entry(node_id).and_modify(|h| {
h.remove(coord);
});
}

pub fn array_chunks_iterator(
&self,
node_id: NodeId,
Expand Down Expand Up @@ -205,8 +215,22 @@ impl ChangeSet {
})
}

pub fn all_modified_chunks_iterator(
&self,
) -> impl Iterator<Item = (&NodeId, impl Iterator<Item = &ChunkIndices>)> + '_ {
self.set_chunks.iter().map(|(node, changes)| (node, changes.keys()))
}

pub fn new_nodes(&self) -> impl Iterator<Item = &Path> {
self.new_groups.keys().chain(self.new_arrays.keys())
self.new_groups().chain(self.new_arrays())
}

pub fn new_groups(&self) -> impl Iterator<Item = &Path> {
self.new_groups.keys()
}

pub fn new_arrays(&self) -> impl Iterator<Item = &Path> {
self.new_arrays.keys()
}

pub fn take_chunks(
Expand Down
275 changes: 275 additions & 0 deletions icechunk/src/conflict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
use std::collections::HashMap;

use async_trait::async_trait;

use crate::{
change_set::ChangeSet,
format::{transaction_log::TransactionLog, NodeId, Path},
repository::RepositoryResult,
Repository,
};

#[derive(Debug, PartialEq, Eq)]
pub enum UnsolvableConflict {
NoFastForwardConfigured,
ChunksWrittenToDeletedArrays(Vec<Path>),
WriteToWrittenChunk(HashMap<Path, usize>),
ConflictingUserAttributesUpdate(Vec<Path>),
ConflictingZarrMetadataUpdate(Vec<Path>),
ConflictingGroupCreation(Vec<Path>),
ConflictingArrayCreation(Vec<Path>),
}

#[derive(Debug)]
pub enum ConflictResolution {
Patched(ChangeSet),
Failure { reason: Vec<UnsolvableConflict>, unmodified: ChangeSet },
}

#[async_trait]
pub trait ConflictSolver {
async fn solve(
&self,
previous_change: &TransactionLog,
previous_repo: &Repository,
current_changes: ChangeSet,
) -> RepositoryResult<ConflictResolution>;
}

pub struct NoFastForward();

#[async_trait]
impl ConflictSolver for NoFastForward {
async fn solve(
&self,
_previous_change: &TransactionLog,
_previous_repo: &Repository,
current_changes: ChangeSet,
) -> RepositoryResult<ConflictResolution> {
Ok(ConflictResolution::Failure {
reason: vec![UnsolvableConflict::NoFastForwardConfigured],
unmodified: current_changes,
})
}
}

pub enum VersionSelection {
Fail,
Ours,
Theirs,
}

pub struct BasicConflictSolver {
pub on_chunk_write_conflicts: VersionSelection,
pub on_user_attributes_conflict: VersionSelection,
pub on_zarr_metadata_conflict: VersionSelection,
pub on_group_creation_conflict: VersionSelection,
pub on_array_creation_conflict: VersionSelection,
}

impl Default for BasicConflictSolver {
fn default() -> Self {
Self {
on_chunk_write_conflicts: VersionSelection::Fail,
on_user_attributes_conflict: VersionSelection::Fail,
on_zarr_metadata_conflict: VersionSelection::Fail,
on_group_creation_conflict: VersionSelection::Fail,
on_array_creation_conflict: VersionSelection::Fail,
}
}
}

#[async_trait]
impl ConflictSolver for BasicConflictSolver {
async fn solve(
&self,
previous_change: &TransactionLog,
previous_repo: &Repository,
mut current_changes: ChangeSet,
) -> RepositoryResult<ConflictResolution> {
let write_chunk_to_deleted_array_conflicts = current_changes
.written_arrays()
.filter_map(|node_id| previous_change.deleted_arrays.get(node_id))
.cloned()
.collect::<Vec<_>>();

let mut write_to_written_chunks_conflicts = HashMap::new();
for (node, indices_written) in current_changes.all_modified_chunks_iterator() {
if let Some(previous_updates) = previous_change.updated_chunks.get(node) {
let conflicts = indices_written
.filter(|idx| previous_updates.contains(idx))
.cloned()
.collect::<Vec<_>>();
if !conflicts.is_empty() {
let path = find_path(node, previous_repo)
.await?
.expect("Bug in conflict detection");
write_to_written_chunks_conflicts.insert((*node, path), conflicts);
}
}
}

let mut both_updated_attributes_conflicts = Vec::new();
for node in previous_change
.updated_user_attributes
.iter()
.filter(|node| current_changes.has_updated_attributes(node))
{
let path =
find_path(node, previous_repo).await?.expect("Bug in conflict detection");
both_updated_attributes_conflicts.push(path);
}

let mut both_updated_zarr_metadata_conflicts = Vec::new();
for node in previous_change
.updated_zarr_metadata
.iter()
.filter(|node| current_changes.get_updated_zarr_metadata(**node).is_some())
{
let path =
find_path(node, previous_repo).await?.expect("Bug in conflict detection");
both_updated_zarr_metadata_conflicts.push(path);
}

let both_created_group_conflicts = current_changes
.new_groups()
.filter(|path| previous_change.new_groups.contains_key(path))
.cloned()
.collect::<Vec<_>>();

let both_created_array_conflicts = current_changes
.new_arrays()
.filter(|path| previous_change.new_arrays.contains_key(path))
.cloned()
.collect::<Vec<_>>();

let mut conflicts = Vec::new();

if !write_chunk_to_deleted_array_conflicts.is_empty() {
conflicts.push(UnsolvableConflict::ChunksWrittenToDeletedArrays(
write_chunk_to_deleted_array_conflicts,
));
}

let mut delete_our_conflict_chunks = None;

if !write_to_written_chunks_conflicts.is_empty() {
match self.on_chunk_write_conflicts {
VersionSelection::Fail => {
conflicts.push(UnsolvableConflict::WriteToWrittenChunk(
write_to_written_chunks_conflicts
.into_iter()
.map(|((_node, path), conflicts)| (path, conflicts.len()))
.collect(),
));
}
VersionSelection::Ours => {
// Nothing to do, our chunks will win
}
VersionSelection::Theirs => {
delete_our_conflict_chunks = Some(write_to_written_chunks_conflicts);
}
}
}

// let mut delete_our_user_attributes = false;

// if !both_updated_attributes_conflicts.is_empty() {
// match self.on_user_attributes_conflict {
// VersionSelection::Fail => {
// conflicts.push(UnsolvableConflict::ConflictingUserAttributesUpdate(
// both_updated_attributes_conflicts,
// ));
// }
// VersionSelection::Ours => {
// // Nothing to do, our metadata will win
// }
// VersionSelection::Theirs => {
// delete_our_user_attributes = true;
// }
// }
// }

// let mut delete_our_updated_zarr_metadata = false;

// if !both_updated_zarr_metadata_conflicts.is_empty() {
// match self.on_zarr_metadata_conflict {
// VersionSelection::Fail => {
// conflicts.push(UnsolvableConflict::ConflictingZarrMetadataUpdate(
// both_updated_zarr_metadata_conflicts,
// ));
// }
// VersionSelection::Ours => {
// // Nothing to do, our user atts will win
// }
// VersionSelection::Theirs => {
// delete_our_updated_zarr_metadata = true;
// }
// }
// }

// let mut delete_our_new_groups = false;

// if !both_created_group_conflicts.is_empty() {
// match self.on_group_creation_conflict {
// VersionSelection::Fail => {
// conflicts.push(UnsolvableConflict::ConflictingGroupCreation(
// both_created_group_conflicts,
// ));
// }
// VersionSelection::Ours => {
// // Nothing to do, our groups will win
// }
// VersionSelection::Theirs => {
// delete_our_new_groups = true;
// }
// }
// }

// let mut delete_our_new_arrays = false;

// if !both_created_array_conflicts.is_empty() {
// match self.on_array_creation_conflict {
// VersionSelection::Fail => {
// conflicts.push(UnsolvableConflict::ConflictingArrayCreation(
// both_created_array_conflicts,
// ));
// }
// VersionSelection::Ours => {
// // Nothing to do, our groups will win
// }
// VersionSelection::Theirs => {
// delete_our_new_arrays = true;
// }
// }
// }

if conflicts.is_empty() {
//fixme
if let Some(chunks) = delete_our_conflict_chunks {
for ((node, path), conflicting_indices) in chunks {
for idx in conflicting_indices.iter() {
current_changes.unset_chunk_ref(node, idx);
}
}
}

Ok(ConflictResolution::Patched(current_changes))
} else {
Ok(ConflictResolution::Failure {
reason: conflicts,
unmodified: current_changes,
})
}
}
}

async fn find_path(node: &NodeId, repo: &Repository) -> RepositoryResult<Option<Path>> {
Ok(repo.list_nodes().await?.find_map(|node_snap| {
if node_snap.id == *node {
Some(node_snap.path)
} else {
None
}
}))
}
8 changes: 7 additions & 1 deletion icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{metadata::DataType, private};
pub mod attributes;
pub mod manifest;
pub mod snapshot;
pub mod transaction_log;

#[serde_as]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -211,7 +212,7 @@ pub enum IcechunkFormatError {

pub type IcechunkResult<T> = Result<T, IcechunkFormatError>;

type IcechunkFormatVersion = u16;
pub type IcechunkFormatVersion = u16;

pub mod format_constants {
use super::IcechunkFormatVersion;
Expand All @@ -223,6 +224,11 @@ pub mod format_constants {
pub const LATEST_ICECHUNK_SNAPSHOT_FORMAT: IcechunkFormatVersion = 0;
pub const LATEST_ICECHUNK_SNAPSHOT_CONTENT_TYPE: &str = "application/msgpack";
pub const LATEST_ICECHUNK_SNAPSHOT_VERSION_METADATA_KEY: &str = "ic-sna-fmt-ver";

pub const LATEST_ICECHUNK_TRANSACTION_LOG_FORMAT: IcechunkFormatVersion = 0;
pub const LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE: &str = "application/msgpack";
pub const LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY: &str =
"ic-tx-fmt-ver";
}

impl Display for Path {
Expand Down
Loading

0 comments on commit cab1ef0

Please sign in to comment.