-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add an example implementing a custom ZipStore.
This adds an example showing how one can use the library to create their own custom Zarr store and the minimum requirements needed to be satisfied.
- Loading branch information
Showing
5 changed files
with
317 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
(executable | ||
(name readonly_zipstore) | ||
(modules readonly_zipstore) | ||
(ocamlopt_flags (:standard -O3)) | ||
(libraries zarr-eio camlzip)) | ||
|
||
(executable | ||
(name inmemory_zipstore) | ||
(modules inmemory_zipstore) | ||
(ocamlopt_flags (:standard -O3)) | ||
(libraries zarr-lwt zipc) | ||
(preprocess | ||
(pps ppx_deriving.show))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
(* This module implements a Zip file zarr store that is Lwt-aware. | ||
It supports both read and write operations. This is because the | ||
underlying Zip library used reads all Zip file bytes into memory. All | ||
store updates are done in-memory and thus to update the actual zip file | ||
we must persist the data using `MemoryZipStore.write_to_file`. | ||
The main requirement is to implement the signature of Zarr.Types.IO. | ||
We use Zarr_lwt Deferred module for `Deferred` so that the store can be | ||
Lwt-aware. | ||
To compile & run this example execute the command | ||
dune exec -- examples/inmemory_zipstore.exe | ||
in your shell at the root of this project. *) | ||
|
||
module MemoryZipStore : sig | ||
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t | ||
(*val create : ?level:Zipc_deflate.level -> string -> t *) | ||
val open_store : ?level:Zipc_deflate.level -> string -> t | ||
val write_to_file : t -> unit Deferred.t | ||
end = struct | ||
module M = Map.Make(String) | ||
|
||
module Z = struct | ||
module Deferred = Zarr_lwt.Deferred | ||
open Deferred.Infix | ||
|
||
type t = | ||
{mutable ic : Zipc.t | ||
;mutex : Lwt_mutex.t | ||
;level : Zipc_deflate.level | ||
;path : string} | ||
|
||
let is_member t key = | ||
Deferred.return @@ Zipc.mem key t.ic | ||
|
||
let size t key = | ||
Deferred.return @@ | ||
match Zipc.find key t.ic with | ||
| None -> 0 | ||
| Some m -> | ||
match Zipc.Member.kind m with | ||
| Zipc.Member.Dir -> 0 | ||
| Zipc.Member.File f -> Zipc.File.decompressed_size f | ||
|
||
let get t key = | ||
Deferred.return @@ | ||
match Zipc.find key t.ic with | ||
| None -> raise (Zarr.Storage.Key_not_found key) | ||
| Some m -> | ||
match Zipc.Member.kind m with | ||
| Zipc.Member.Dir -> failwith "cannot get size of directory." | ||
| Zipc.Member.File f -> | ||
match Zipc.File.to_binary_string f with | ||
| Error e -> failwith e | ||
| Ok s -> s | ||
|
||
let get_partial_values t key ranges = | ||
get t key >>= fun data -> | ||
let size = String.length data in | ||
ranges |> Lwt_list.map_p @@ fun (offset, len) -> | ||
Deferred.return | ||
(match len with | ||
| None -> String.sub data offset (size - offset) | ||
| Some l -> String.sub data offset l) | ||
|
||
let list t = | ||
Deferred.return @@ Zipc.fold | ||
(fun m acc -> | ||
match Zipc.Member.kind m with | ||
| Zipc.Member.Dir -> acc | ||
| Zipc.Member.File _ -> Zipc.Member.path m :: acc) t.ic [] | ||
|
||
let list_dir t prefix = | ||
let module StrSet = Zarr.Util.StrSet in | ||
let n = String.length prefix in | ||
let m = Zipc.to_string_map t.ic in | ||
let prefs, keys = | ||
M.fold | ||
(fun key v ((l, r) as acc) -> | ||
match Zipc.Member.kind v with | ||
| Zipc.Member.Dir -> acc | ||
| Zipc.Member.File _ -> | ||
let pred = String.starts_with ~prefix key in | ||
match key with | ||
| k when pred && String.contains_from k n '/' -> | ||
StrSet.add String.(sub k 0 @@ 1 + index_from k n '/') l, r | ||
| k when pred -> l, k :: r | ||
| _ -> acc) m (StrSet.empty, []) | ||
in Deferred.return (keys, StrSet.elements prefs) | ||
|
||
let set t key value = | ||
match Zipc.File.deflate_of_binary_string ~level:t.level value with | ||
| Error e -> failwith e | ||
| Ok f -> | ||
match Zipc.Member.(make ~path:key @@ File f) with | ||
| Error e -> failwith e | ||
| Ok m -> | ||
Lwt_mutex.with_lock | ||
t.mutex | ||
(fun () -> | ||
t.ic <- Zipc.add m t.ic; | ||
Deferred.return_unit) | ||
|
||
let set_partial_values t key ?(append=false) rv = | ||
let f = | ||
if append then | ||
fun acc (_, v) -> | ||
Deferred.return @@ acc ^ v | ||
else | ||
fun acc (rs, v) -> | ||
let s = Bytes.of_string acc in | ||
String.(length v |> Bytes.blit_string v 0 s rs); | ||
Deferred.return @@ Bytes.to_string s | ||
in | ||
match Zipc.Member.kind (Option.get @@ Zipc.find key t.ic) with | ||
| Zipc.Member.Dir -> Deferred.return_unit | ||
| Zipc.Member.File file -> | ||
match Zipc.File.to_binary_string file with | ||
| Error e -> failwith e | ||
| Ok s -> Deferred.fold_left f s rv >>= set t key | ||
|
||
let erase t key = | ||
Lwt_mutex.with_lock | ||
t.mutex | ||
(fun () -> | ||
t.ic <- Zipc.remove key t.ic; | ||
Deferred.return_unit) | ||
|
||
let erase_prefix t prefix = | ||
let m = Zipc.to_string_map t.ic in | ||
let m' = | ||
M.filter_map | ||
(fun k v -> | ||
if String.starts_with ~prefix k then None else Some v) m in | ||
Lwt_mutex.with_lock | ||
t.mutex | ||
(fun () -> | ||
t.ic <- Zipc.of_string_map m'; | ||
Deferred.return_unit) | ||
end | ||
|
||
(* this functor generates the public signature of our Zip file store. *) | ||
include Zarr.Storage.Make(Z) | ||
|
||
(* now we create functions to open and close the store. *) | ||
|
||
(*let create ?(level=`Default) path = Z.{ic = Zipc.empty; level; path} *) | ||
|
||
let open_store ?(level=`Default) path = | ||
match Zipc.of_binary_string In_channel.(with_open_bin path input_all) with | ||
| Ok ic -> Z.{ic; level; path; mutex = Lwt_mutex.create ()} | ||
| Error e -> failwith e | ||
|
||
let write_to_file (t : Z.t) = | ||
Lwt_io.with_file | ||
~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK] | ||
~mode:Lwt_io.Output | ||
t.path | ||
(fun oc -> | ||
let open Lwt.Infix in | ||
match Zipc.to_binary_string t.ic with | ||
| Error e -> failwith e | ||
| Ok s -> Lwt_io.write oc s >>= fun () -> Lwt_io.flush oc) | ||
end | ||
|
||
let _ = | ||
Lwt_main.run @@ begin | ||
let open Zarr.Node in | ||
let open MemoryZipStore.Deferred.Infix in | ||
|
||
let printlist = [%show: string list] in | ||
let store = MemoryZipStore.open_store "examples/data/testdata.zip" in | ||
MemoryZipStore.find_all_nodes store >>= fun (xs, _) -> | ||
print_endline @@ "All array nodes: " ^ printlist (List.map ArrayNode.to_path xs); | ||
let anode = List.hd @@ List.filter | ||
(fun node -> ArrayNode.to_path node = "/some/group/name") xs in | ||
let slice = Owl_types.[|R [0; 20]; I 10; R []|] in | ||
MemoryZipStore.read_array store anode slice Bigarray.Char >>= fun x -> | ||
print_string @@ "BEFORE: " ^ Owl_pretty.dsnda_to_string x; | ||
let x' = | ||
Owl.Dense.Ndarray.Generic.map | ||
(fun _ -> Owl_stats_dist.uniform_int_rvs ~a:0 ~b:255 |> Char.chr) x in | ||
MemoryZipStore.write_array store anode slice x' >>= fun () -> | ||
MemoryZipStore.read_array store anode slice Bigarray.Char >>= fun y -> | ||
print_string @@ "AFTER: " ^ Owl_pretty.dsnda_to_string y; | ||
MemoryZipStore.write_to_file store >>| fun () -> | ||
print_endline "Zip store has been update." | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
(* This module implements a Read-only Zip file zarr store that is Eio-aware. | ||
The main requirement is to implement the signature of Zarr.Types.IO. | ||
We use Zarr_eio's Deferred module for `Deferred` so that the store can be | ||
Eio-aware. Since Zip stores cannot have files updated or removed, we only | ||
implement the get_* and list_* family of functions and raise an | ||
Not_implemented exception for the set_* and erase_* family of functions. | ||
This effectively allows us to create a read-only store since calling any | ||
of the following functions would result in an `Not_implemented` exception: | ||
- ReadOnlyZipStore.create_group | ||
- ReadOnlyZipStore.create_array | ||
- ReadOnlyZipStore.erase_group_node | ||
- ReadOnlyZipStore.erase_array_node | ||
- ReadOnlyZipStore.erase_all_nodes | ||
- ReadOnlyZipStore.write_array | ||
- ReadOnlyZipStore.reshape | ||
Below we show how to implement this custom Zarr Store. | ||
To compile & run this example execute the command | ||
dune exec -- examples/zipstore.exe | ||
in your shell at the root of this project. *) | ||
|
||
module ReadOnlyZipStore : sig | ||
exception Not_implemented | ||
|
||
include Zarr.Storage.STORE with type 'a Deferred.t = 'a | ||
val open_store : string -> t | ||
val close : t -> unit | ||
|
||
end = struct | ||
exception Not_implemented | ||
|
||
module Z = struct | ||
module Deferred = Zarr_eio.Deferred | ||
open Deferred.Infix | ||
|
||
type t = Zip.in_file | ||
|
||
let is_member t key = | ||
match Zip.find_entry t key with | ||
| exception Not_found -> false | ||
| _ -> true | ||
|
||
let size t key = | ||
match Zip.find_entry t key with | ||
| e -> e.uncompressed_size | ||
| exception Not_found -> 0 | ||
|
||
let get t key = | ||
match Zip.find_entry t key with | ||
| e -> Zip.read_entry t e | ||
| exception Not_found -> raise (Zarr.Storage.Key_not_found key) | ||
|
||
let get_partial_values t key ranges = | ||
get t key >>= fun data -> | ||
let size = String.length data in | ||
ranges |> Eio.Fiber.List.map @@ fun (offset, len) -> | ||
match len with | ||
| None -> String.sub data offset (size - offset) | ||
| Some l -> String.sub data offset l | ||
|
||
let list t = | ||
Zip.entries t |> Eio.Fiber.List.filter_map @@ function | ||
| (e : Zip.entry) when not e.is_directory -> Some e.filename | ||
| _ -> None | ||
|
||
let list_dir t prefix = | ||
let module StrSet = Zarr.Util.StrSet in | ||
let n = String.length prefix in | ||
let prefs, keys = | ||
List.fold_left | ||
(fun ((l, r) as acc) -> function | ||
| (e : Zip.entry) when e.is_directory -> acc | ||
| e when not @@ String.starts_with ~prefix e.filename -> acc | ||
| e when String.contains_from e.filename n '/' -> | ||
let key = e.filename in | ||
let pre = String.sub key 0 @@ 1 + String.index_from key n '/' in | ||
StrSet.add pre l, r | ||
| e -> l, e.filename :: r) (StrSet.empty, []) @@ Zip.entries t | ||
in keys, StrSet.elements prefs | ||
|
||
let set _ = raise Not_implemented | ||
|
||
let set_partial_values _ = raise Not_implemented | ||
|
||
let erase _ = raise Not_implemented | ||
|
||
let erase_prefix _ = raise Not_implemented | ||
end | ||
|
||
(* this functor generates the public signature of our Zip file store. *) | ||
include Zarr.Storage.Make(Z) | ||
|
||
(* now we create functions to open and close the store. *) | ||
let open_store path = Zip.open_in path | ||
let close = Zip.close_in | ||
end | ||
|
||
let _ = | ||
Eio_main.run @@ fun _ -> | ||
let open Zarr.Metadata in | ||
let open Zarr.Node in | ||
|
||
let store = ReadOnlyZipStore.open_store "examples/data/testdata.zip" in | ||
let xs, _ = ReadOnlyZipStore.find_all_nodes store in | ||
let anode = List.hd @@ Eio.Fiber.List.filter | ||
(fun node -> ArrayNode.to_path node = "/some/group/name") xs in | ||
let meta = ReadOnlyZipStore.array_metadata store anode in | ||
let slice = Array.map (Fun.const @@ Owl_types.R []) (ArrayMetadata.shape meta) in | ||
let arr = ReadOnlyZipStore.read_array store anode slice Bigarray.Char in | ||
print_string @@ Owl_pretty.dsnda_to_string arr; | ||
try ReadOnlyZipStore.write_array store anode slice arr with | ||
| ReadOnlyZipStore.Not_implemented -> print_endline "Store is read-only"; | ||
ReadOnlyZipStore.close store |