Skip to content

Commit

Permalink
Perform minor updates and corrections across library.
Browse files Browse the repository at this point in the history
Fix README typos.

Use non-blocking synchronization for inmemory zipstore.

Update PicosFSStore function in example.

Use type annotations for MemoryStore.

Style updates to storage implementation.

Update documentation.

fix slice
  • Loading branch information
zoj613 committed Oct 4, 2024
1 parent e8d9d39 commit 7e97b9d
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 190 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ List.map Node.Group.to_path g;;
FilesystemStore.Group.delete store group_node;;
FilesystemStore.clear store;; (* clears the store *)
FilesystemStore.Group.rename store group_node;;
FilesystemStore.Array.rename store anode;;
FilesystemStore.Group.rename store group_node "new_name";;
FilesystemStore.Array.rename store anode "new_name";;
```

[1]: https://codecov.io/gh/zoj613/zarr-ml/graph/badge.svg?token=KOOG2Y1SH5
Expand Down
118 changes: 72 additions & 46 deletions examples/inmemory_zipstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,23 @@
in your shell at the root of this project. *)

module ZipStore : sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t
val with_open : ?level:Zipc_deflate.level -> string -> (t -> 'a Deferred.t) -> 'a Deferred.t
include Zarr.Storage.STORE with module Deferred = Zarr_lwt.Deferred
val with_open : ?level:Zipc_deflate.level -> Unix.file_perm -> string -> (t -> 'a Deferred.t) -> 'a Deferred.t
end = struct
module M = Map.Make(String)

module Z = struct
module Deferred = Zarr_lwt.Deferred
open Deferred.Infix
open Deferred.Syntax

type t =
{mutable ic : Zipc.t
;mutex : Lwt_mutex.t
;level : Zipc_deflate.level
;path : string}
type t = {ic : Zipc.t Atomic.t; level : Zipc_deflate.level}

let is_member t key =
Deferred.return @@ Zipc.mem key t.ic
Deferred.return @@ Zipc.mem key @@ Atomic.get t.ic

let size t key =
Deferred.return @@
match Zipc.find key t.ic with
Deferred.return @@
match Zipc.find key @@ Atomic.get t.ic with
| None -> 0
| Some m ->
match Zipc.Member.kind m with
Expand All @@ -45,7 +40,7 @@ end = struct

let get t key =
Deferred.return @@
match Zipc.find key t.ic with
match Zipc.find key @@ Atomic.get t.ic with
| None -> raise (Zarr.Storage.Key_not_found key)
| Some m ->
match Zipc.Member.kind m with
Expand All @@ -65,12 +60,12 @@ end = struct
(fun m acc ->
match Zipc.Member.kind m with
| Zipc.Member.Dir -> acc
| Zipc.Member.File _ -> Zipc.Member.path m :: acc) t.ic []
| Zipc.Member.File _ -> Zipc.Member.path m :: acc) (Atomic.get t.ic) []

let list_dir t prefix =
let module S = Set.Make(String) in
let n = String.length prefix in
let m = Zipc.to_string_map t.ic in
let m = Zipc.to_string_map @@ Atomic.get t.ic in
let prefs, keys =
M.fold
(fun key v ((l, r) as acc) ->
Expand All @@ -85,17 +80,18 @@ end = struct
| _ -> acc) m (S.empty, [])
in Deferred.return (keys, S.elements prefs)

let set t key value =
let rec set t key value =
match Zipc.File.deflate_of_binary_string ~level:t.level value with
| Error e -> failwith e
| Ok f ->
match Zipc.Member.(make ~path:key @@ File f) with
| Error e -> failwith e
| Ok m ->
Lwt_mutex.with_lock t.mutex @@ fun () ->
Deferred.return (t.ic <- Zipc.add m t.ic)
let z = Atomic.get t.ic in
if Atomic.compare_and_set t.ic z @@ Zipc.add m z
then Deferred.return_unit else set t key value

let set_partial_values t key ?(append=false) rv =
let rec set_partial_values t key ?(append=false) rv =
let f =
if append then
fun acc (_, v) ->
Expand All @@ -106,46 +102,76 @@ end = struct
String.(length v |> Bytes.blit_string v 0 s rs);
Deferred.return @@ Bytes.unsafe_to_string s
in
match Zipc.Member.kind (Option.get @@ Zipc.find key t.ic) with
let z = Atomic.get t.ic in
match Zipc.Member.kind (Option.get @@ Zipc.find key z) with
| Zipc.Member.Dir -> Deferred.return_unit
| Zipc.Member.File file ->
match Zipc.File.to_binary_string file with
| Error e -> failwith e
| Ok s -> Deferred.fold_left f s rv >>= set t key

let erase t key =
Lwt_mutex.with_lock t.mutex @@ fun () ->
Deferred.return (t.ic <- Zipc.remove key t.ic)

let erase_prefix t prefix =
let m = Zipc.to_string_map t.ic in
| Ok s ->
let* value = Deferred.fold_left f s rv in
match Zipc.File.deflate_of_binary_string ~level:t.level value with
| Error e -> failwith e
| Ok f ->
match Zipc.Member.(make ~path:key @@ File f) with
| Error e -> failwith e
| Ok m ->
if Atomic.compare_and_set t.ic z @@ Zipc.add m z
then Deferred.return_unit else set_partial_values t key ~append rv

let rec erase t key =
let z = Atomic.get t.ic in
let z' = Zipc.remove key z in
if Atomic.compare_and_set t.ic z z'
then Deferred.return_unit else erase t key

let rec erase_prefix t prefix =
let z = Atomic.get t.ic in
let m = Zipc.to_string_map z in
let m' = M.filter_map
(fun k v -> if String.starts_with ~prefix k then None else Some v) m in
Lwt_mutex.with_lock t.mutex @@ fun () ->
Deferred.return (t.ic <- Zipc.of_string_map m')

let rename t ok nk =
Lwt_mutex.with_lock t.mutex @@ fun () ->
let m = Zipc.to_string_map t.ic in
let m1, m2 = M.partition (fun k _ -> String.starts_with ~prefix:ok k) m in
let l = String.length ok in
let s = Seq.map
(fun (k, v) -> nk ^ String.(length k - l |> sub k l), v) @@ M.to_seq m1 in
t.ic <- Zipc.of_string_map @@ M.add_seq s m2; Lwt.return_unit
let z' = Zipc.of_string_map m' in
if Atomic.compare_and_set t.ic z z'
then Deferred.return_unit else erase_prefix t prefix

(* Adapted from: https://github.com/dbuenzli/zipc/issues/8#issuecomment-2392417890 *)
let rec rename t prefix new_prefix =
let rename_member ~prefix ~new_prefix m =
let path = Zipc.Member.path m in
if not (String.starts_with ~prefix path) then m else
let l = String.length prefix in
let path = new_prefix ^ String.sub path l (String.length path - l) in
let mtime = Zipc.Member.mtime m in
let mode = Zipc.Member.mode m in
let kind = Zipc.Member.kind m in
match Zipc.Member.make ~mtime ~mode ~path kind with
| Ok m' -> m' | Error e -> failwith e
in
let z = Atomic.get t.ic in
let add m acc = Zipc.add (rename_member ~prefix ~new_prefix m) acc in
let z' = Zipc.fold add z Zipc.empty in
if Atomic.compare_and_set t.ic z z'
then Deferred.return_unit else rename t prefix new_prefix
end
(* this functor generates the public signature of our Zip file store. *)
include Zarr.Storage.Make(Z)

let with_open ?(level=`Default) path f =
let with_open ?(level=`Default) perm path f =
let s = In_channel.(with_open_bin path input_all) in
let t = match Zipc.of_binary_string s with
| Ok ic -> Z.{ic; level; path; mutex = Lwt_mutex.create ()}
let x = match Zipc.of_binary_string s with
| Ok z -> Z.{ic = Atomic.make z; level}
| Error e -> failwith e
in
Lwt.finalize (fun () -> f t) @@ fun () ->
let flags = Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK] in
Lwt_io.with_file ~flags ~mode:Lwt_io.Output t.path @@ fun oc ->
Result.fold ~error:failwith ~ok:(Lwt_io.write oc) @@ Zipc.to_binary_string t.ic
let open Deferred.Syntax in
let+ out = f x in
let flags = [Open_wronly; Open_trunc; Open_creat] in
match Zipc.to_binary_string @@ Atomic.get x.ic with
| Error e -> failwith e
| Ok v ->
Out_channel.with_open_gen flags perm path @@ fun oc ->
Out_channel.output_string oc v;
Out_channel.flush oc;
out
end

let _ =
Expand All @@ -155,7 +181,7 @@ let _ =
let open Zarr.Indexing in
let open ZipStore.Deferred.Syntax in

ZipStore.with_open "examples/data/testdata.zip" @@ fun store ->
ZipStore.with_open 0o700 "examples/data/testdata.zip" @@ fun store ->
let* xs, _ = ZipStore.hierarchy store in
let anode = List.hd @@ List.filter
(fun node -> Node.Array.to_path node = "/some/group/name") xs in
Expand Down
18 changes: 9 additions & 9 deletions examples/picos_fs_store.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
module PU = Picos_io.Unix

module PicosFSStore : sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
include Zarr.Storage.STORE with module Deferred = Zarr_sync.Deferred
val create : ?perm:Unix.file_perm -> string -> t
end = struct

Expand Down Expand Up @@ -129,28 +129,28 @@ end

let _ =
Picos_mux_random.run_on ~n_domains:1 @@ fun () ->
let open Zarr.Node in
let open Zarr in
let open Zarr.Codecs in
let open Zarr.Ndarray in
let open Zarr.Indexing in

let store = PicosFSStore.create "picosdata.zarr" in
let gnode = GroupNode.of_path "/some/group" in
PicosFSStore.create_group store gnode;
let anode = ArrayNode.(gnode / "name") in
let gnode = Node.Group.of_path "/some/group" in
PicosFSStore.Group.create store gnode;
let anode = Node.Array.(gnode / "name") in
let config =
{chunk_shape = [|5; 3; 5|]
;codecs = [`Bytes LE; `Gzip L5]
;index_codecs = [`Bytes BE; `Crc32c]
;index_location = Start} in
PicosFSStore.create_array
PicosFSStore.Array.create
~codecs:[`ShardingIndexed config]
~shape:[|100; 100; 50|]
~chunks:[|10; 15; 20|]
Char '?' anode store;
let slice = [|R [|0; 20|]; I 10; R [||]|] in
let x = PicosFSStore.read_array store anode slice Char in
let x = PicosFSStore.Array.read store anode slice Char in
let x' = Zarr.Ndarray.map (fun _ -> Random.int 256 |> Char.chr) x in
PicosFSStore.write_array store anode slice x';
let y = PicosFSStore.read_array store anode slice Char in
PicosFSStore.Array.write store anode slice x';
let y = PicosFSStore.Array.read store anode slice Char in
assert (equal x' y)
18 changes: 9 additions & 9 deletions examples/readonly_zipstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
raise a Not_implemented exception for the set_* and erase_* family of
functions. This effectively allows us to create a read-only store since
calling any of the following functions would result in error:
- ZipStore.create_group
- ZipStore.create_array
- ZipStore.erase_group_node
- ZipStore.erase_array_node
- ZipStore.Group.create
- ZipStore.Array.create
- ZipStore.Group.delete
- ZipStore.Array.delete
- ZipStore.clear
- ZipStore.write_array
- ZipStore.reshape
- ZipStore.rename_array
- ZipStore.rename_group
- ZipStore.Array.write
- ZipStore.Array.reshape
- ZipStore.Array.rename
- ZipStore.Group.rename
Below we show how to implement this custom Zarr Store.
To compile & run this example execute the command
Expand All @@ -22,7 +22,7 @@

module ZipStore : sig
exception Not_implemented
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
include Zarr.Storage.STORE with module Deferred = Zarr_eio.Deferred
val with_open : string -> (t -> 'a) -> 'a
end = struct
exception Not_implemented
Expand Down
26 changes: 13 additions & 13 deletions zarr/src/node.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ module Group : sig

val create : t -> string -> t
(** [create p n] returns a group node with parent [p] and name [n].
@raise Failure if node invariants are not satisfied. *)
@raise Node_invariant if node invariants are not satisfied. *)

val ( / ) : t -> string -> t
(** The infix operator alias of {!create} *)

val of_path : string -> t
(** [of_path s] returns a node from string [s].
@raise Failure if node invariants are not satisfied. *)
@raise Node_invariant if node invariants are not satisfied. *)

val to_path : t -> string
(** [to_path n] returns node [n] as a string path. *)
Expand All @@ -52,7 +52,7 @@ module Group : sig
val ancestors : t -> t list
(** [ancestors n] returns ancestor nodes of [n] including the root node.
The root node has no ancestors, thus this returns the empty list
is called on a root node. *)
if called on a root node. *)

val to_key : t -> string
(** [to_key n] converts a node's path to a key, as defined in the Zarr V3
Expand All @@ -63,25 +63,25 @@ module Group : sig
in the Zarr V3 specification. *)

val to_metakey : t -> string
(** [to_prefix n] returns the metadata key associated with node [n],
(** [to_metakey n] returns the metadata key associated with node [n],
as defined in the Zarr V3 specification. *)

val is_child_group : t -> t -> bool
(** [is_child_group m n] Tests if group node [m] is a the immediate parent of
(** [is_child_group m n] Tests if group node [m] is the immediate parent of
group node [n]. Returns [true] when the test passes and [false] otherwise. *)

val show : t -> string
(** [show n] returns a string representation of a node type. *)
(** [show n] returns a string representation of a node type.*)

val pp : Format.formatter -> t -> unit
(** [pp fmt t] pretty prints a node type value. *)
(** [pp fmt t] pretty prints a node type value.*)

val rename : t -> string -> t
(** [rename t s] returns a new group node with all properties of [t]
but with its name changed to [s].
@raise Node_invariant if [s] is invalid name.
@raise Renaming_root if [t] is a root node.*)
@raise Cannot_rename_root if [t] is a root node.*)
end

module Array : sig
Expand All @@ -90,17 +90,17 @@ module Array : sig

val create : Group.t -> string -> t
(** [create p n] returns an array node with parent [p] and name [n].
@raise Failure if node invariants are not satisfied. *)
@raise Node_invariant if node invariants are not satisfied. *)

val ( / ) : Group.t -> string -> t
(** The infix operator alias of {!ArrayNode.create} *)
(** The infix operator alias of {!create} *)

val root : t
(** creates an array root node *)

val of_path : string -> t
(** [of_path s] returns an array node from string [s].
@raise Failure if node invariants are not satisfied. *)
@raise Node_invariant if node invariants are not satisfied. *)

val to_path : t -> string
(** [to_path n] returns array node [n] as a string path. *)
Expand Down Expand Up @@ -128,7 +128,7 @@ module Array : sig
specification. *)

val to_metakey : t -> string
(** [to_prefix n] returns the metadata key associated with node [n],
(** [to_metakey n] returns the metadata key associated with node [n],
as defined in the Zarr V3 specification. *)

val show : t -> string
Expand All @@ -142,5 +142,5 @@ module Array : sig
but with its name changed to [s].
@raise Node_invariant if [s] is invalid name.
@raise Renaming_root if [t] is a root node.*)
@raise Cannot_rename_root if [t] is a root node.*)
end
Loading

0 comments on commit 7e97b9d

Please sign in to comment.