Skip to content

Commit

Permalink
Improve correctness of shardingIndexed partial read/write.
Browse files Browse the repository at this point in the history
This ensures sharding indexed codec's partial reads and writes are
implemented correctly such that they reliably read sharded data written
by other implementations.
  • Loading branch information
zoj613 committed Oct 1, 2024
1 parent 797ed42 commit aa72566
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 158 deletions.
27 changes: 16 additions & 11 deletions examples/inmemory_zipstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,28 @@ end = struct
then Deferred.return_unit else set t key value

let rec set_partial_values t key ?(append=false) rv =
let f =
if append then
fun acc (_, v) ->
Deferred.return @@ acc ^ v
else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
String.(length v |> Bytes.blit_string v 0 s rs);
Deferred.return @@ Bytes.unsafe_to_string s
in
let z = Atomic.get t.ic in
match Zipc.Member.kind (Option.get @@ Zipc.find key z) with
let mem = match Zipc.find key z with
| Some m -> m
| None ->
let empty = Result.fold
~error:failwith ~ok:Fun.id @@ Zipc.File.stored_of_binary_string String.empty in
Result.fold
~error:failwith ~ok:Fun.id @@ Zipc.Member.make ~path:key (Zipc.Member.File empty)
in
match Zipc.Member.kind mem with
| Zipc.Member.Dir -> Deferred.return_unit
| Zipc.Member.File file ->
match Zipc.File.to_binary_string file with
| Error e -> failwith e
| Ok s ->
let f = if append || s = String.empty then
fun acc (_, v) -> Deferred.return @@ acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
String.(length v |> Bytes.blit_string v 0 s rs);
Deferred.return @@ Bytes.unsafe_to_string s
in
let* value = Deferred.fold_left f s rv in
match Zipc.File.deflate_of_binary_string ~level:t.level value with
| Error e -> failwith e
Expand Down
24 changes: 16 additions & 8 deletions examples/picos_fs_store.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ end = struct
PU.mkdir parent_dir perm

let size t key =
let fd = PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm in
Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () ->
PU.set_nonblock fd;
PU.(fstat fd).st_size
match PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm with
| exception Unix.Unix_error (Unix.ENOENT, "open", _) -> 0
| fd ->
Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () ->
PU.set_nonblock fd;
PU.(fstat fd).st_size

let get t key =
let fd = PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm in
Expand Down Expand Up @@ -72,8 +74,14 @@ end = struct
ignore @@ PU.write_substring fd v 0 (String.length v)

let set_partial_values t key ?(append=false) rvs =
let flags = if append then PU.[O_APPEND; O_WRONLY] else [PU.O_WRONLY] in
let fd = PU.openfile (key_to_fspath t key) flags t.perm in
let flags = match append with
| false -> PU.[O_WRONLY; O_CREAT]
| true -> PU.[O_APPEND; O_WRONLY; O_CREAT]
in
let p = key_to_fspath t key in
create_parent_dir p t.perm;
let fd = PU.openfile p flags t.perm in
Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () ->
rvs |> List.iter @@ fun (ofs, v) ->
if append then ignore @@ PU.lseek fd 0 PU.SEEK_END
else ignore @@ PU.lseek fd ofs PU.SEEK_SET;
Expand Down Expand Up @@ -113,8 +121,8 @@ end = struct
Fun.protect ~finally:(fun () -> PU.closedir h) @@ fun () ->
entries h [] |> List.partition_map @@ fun x ->
match Filename.concat dir x with
| p when Sys.is_directory p -> Either.right @@ (fspath_to_key t p) ^ "/"
| p -> Either.left @@ fspath_to_key t p
| p when (PU.stat p).st_kind = PU.S_DIR -> Either.right @@ (fspath_to_key t p) ^ "/"
| p -> Either.left @@ fspath_to_key t p

let rename t k k' = PU.rename (key_to_fspath t k) (key_to_fspath t k')
end
Expand Down
34 changes: 21 additions & 13 deletions zarr-eio/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ module FilesystemStore = struct
let key_to_fspath t key = Eio.Path.(t.root / key)

let size t key =
Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow ->
Optint.Int63.to_int @@ Eio.File.size flow

let get t key = match Eio.Path.load @@ key_to_fspath t key with
| exception Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) ->
try
Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow ->
Optint.Int63.to_int @@ Eio.File.size flow
with
| Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) -> 0

let get t key =
try Eio.Path.load @@ key_to_fspath t key with
| Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) ->
raise @@ Zarr.Storage.Key_not_found key
| v -> v

let get_partial_values t key ranges =
Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow ->
Expand All @@ -33,26 +36,31 @@ module FilesystemStore = struct
let a' = Option.fold ~none:(a + size - s) ~some:(Int.add a) l in
a', (Optint.Int63.of_int s, a, a' - a)) 0 ranges in
let buffer = Bigarray.Array1.create Char C_layout size' in
ranges' |> Eio.Fiber.List.map @@ fun (fo, off, len) ->
ranges' |> List.map @@ fun (fo, off, len) ->
let file_offset = Eio.File.seek flow fo `Set in
let buf = Cstruct.of_bigarray ~off ~len buffer in
Eio.File.pread_exact flow ~file_offset [buf];
Cstruct.to_string buf

let set t key value =
let fp = key_to_fspath t key in
let create_parent_dir fp perm =
Option.fold
~some:(fun (p, _) -> Eio.Path.mkdirs ~exists_ok:true ~perm p)
~none:()
~some:(fun (p, _) -> Eio.Path.mkdirs ~exists_ok:true ~perm:t.perm p)
(Eio.Path.split fp);
(Eio.Path.split fp)

let set t key value =
let fp = key_to_fspath t key in
create_parent_dir fp t.perm;
Eio.Path.save ~create:(`Or_truncate t.perm) fp value

let set_partial_values t key ?(append=false) rvs =
let fp = key_to_fspath t key in
create_parent_dir fp t.perm;
let l = List.fold_left (fun a (_, s) -> Int.max a (String.length s)) 0 rvs in
let buffer = Bigarray.Array1.create Char C_layout l in
let allocator len = Cstruct.of_bigarray ~off:0 ~len buffer in
Eio.Path.with_open_out ~append ~create:`Never (key_to_fspath t key) @@ fun flow ->
rvs |> Eio.Fiber.List.iter @@ fun (ofs, str) ->
Eio.Path.with_open_out ~append ~create:(`If_missing t.perm) fp @@ fun flow ->
rvs |> List.iter @@ fun (ofs, str) ->
let file_offset = Eio.File.seek flow (Optint.Int63.of_int ofs) `Set in
Eio.File.pwrite_all flow ~file_offset [Cstruct.of_string ~allocator str]

Expand Down
5 changes: 4 additions & 1 deletion zarr-eio/test/test_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ let test_storage
Ndarray.fill exp Complex.{re=0.; im=3.0};
Array.write store anode slice exp;
let got = Array.read store anode slice Complex32 in
assert_equal exp got)
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]];

let child = Node.Group.of_path "/some/child/group" in
Expand Down
25 changes: 14 additions & 11 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ module FilesystemStore = struct
| true -> Lwt.return_unit

let size t key =
Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int
Lwt.catch
(fun () -> Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int)
(fun _ -> Deferred.return 0)

let get t key =
let* bsize = size t key in
Expand Down Expand Up @@ -76,17 +78,18 @@ module FilesystemStore = struct

let set_partial_values t key ?(append=false) rvs =
let l = List.fold_left (fun a (_, s) -> Int.max a (String.length s)) 0 rvs in
let flags = match append with
| false -> Unix.[O_WRONLY; O_CREAT]
| true -> Unix.[O_APPEND; O_WRONLY; O_CREAT]
in
let fp = key_to_fspath t key in
let* () = create_parent_dir fp t.perm in
Lwt_io.with_file
~buffer:(Lwt_bytes.create l)
~flags:(if append then Unix.[O_APPEND; O_WRONLY] else [Unix.O_WRONLY])
~perm:t.perm
~mode:Lwt_io.Output
(key_to_fspath t key)
@@ fun oc ->
Lwt_list.iter_s
(fun (ofs, value) ->
let* () = Lwt_io.set_position oc @@ Int64.of_int ofs in
Lwt_io.write oc value) rvs
~buffer:(Lwt_bytes.create l) ~flags ~perm:t.perm ~mode:Lwt_io.Output fp @@ fun oc ->
Lwt_list.iter_s
(fun (ofs, value) ->
let* () = Lwt_io.set_position oc @@ Int64.of_int ofs in
Lwt_io.write oc value) rvs

let rec walk t acc dir =
Lwt_stream.fold_s
Expand Down
7 changes: 5 additions & 2 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ let test_storage
assert_equal exp arr;
Ndarray.fill exp Complex.{re=0.; im=3.0};
Array.write store anode slice exp >>= fun () ->
Array.read store anode slice Complex32 >>| fun got ->
assert_equal exp got)
Array.read store anode slice Complex32 >>= fun got ->
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]] >>= fun () ->

let child = Node.Group.of_path "/some/child/group" in
Expand Down
23 changes: 14 additions & 9 deletions zarr-sync/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ module FilesystemStore = struct
let s = In_channel.length ic |> Int64.to_int in
ranges |> List.map @@ fun (ofs, len) ->
In_channel.seek ic @@ Int64.of_int ofs;
let l = Option.fold ~none:(s - ofs) ~some:Fun.id len in
Option.get @@ In_channel.really_input_string ic l
really_input_string ic @@ Option.fold ~none:(s - ofs) ~some:Fun.id len

let set t key v =
let p = key_to_fspath t key in
Expand All @@ -35,21 +34,27 @@ module FilesystemStore = struct
Out_channel.(with_open_gen f t.perm p @@ fun oc -> output_string oc v; flush oc)

let set_partial_values t key ?(append=false) rvs =
let f = [if append then Open_append else Open_wronly] in
let p = key_to_fspath t key in
Out_channel.with_open_gen f t.perm p @@ fun oc ->
rvs |> List.iter (fun (rs, value) ->
Out_channel.seek oc @@ Int64.of_int rs;
Out_channel.output_string oc value);
Zarr.Util.create_parent_dir p t.perm;
let flags = match append with
| false -> [Open_creat; Open_wronly]
| true -> [Open_append; Open_creat; Open_wronly]
in
Out_channel.with_open_gen flags t.perm p @@ fun oc ->
List.iter
(fun (rs, value) ->
Out_channel.seek oc @@ Int64.of_int rs;
Out_channel.output_string oc value) rvs;
Out_channel.flush oc

let is_member t key = Sys.file_exists @@ key_to_fspath t key

let erase t key = Sys.remove @@ key_to_fspath t key

let size t key =
In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length)
|> Int64.to_int
match In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length) with
| exception Sys_error _ -> 0
| s -> Int64.to_int s

let rec walk t acc dir =
List.fold_left
Expand Down
16 changes: 8 additions & 8 deletions zarr-sync/test/test_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ let test_storage
;codecs = [`Bytes LE]} in
let anode = Node.Array.(gnode / "arrnode") in
let slice = [|R [|0; 20|]; I 10; R [|0; 29|]|] in
let bigger_slice = [|R [|0; 21|]; L [|9; 10|] ; R [|0; 30|]|] in

List.iter
(fun codecs ->
Expand All @@ -65,10 +66,13 @@ let test_storage
Ndarray.fill exp Complex.{re=2.0; im=0.};
Array.write store anode slice exp;
let got = Array.read store anode slice Complex32 in
(* test if a bigger slice containing new elements can be read from store *)
let _ = Array.read store anode bigger_slice Complex32 in
assert_equal exp got;
Ndarray.fill exp Complex.{re=0.; im=3.0};
Array.write store anode slice exp;
(* test writing a bigger slice to store *)
Array.write store anode bigger_slice @@ Ndarray.init Complex32 [|22; 2; 31|] (Fun.const Complex.{re=0.; im=3.0});
let got = Array.read store anode slice Complex32 in
Ndarray.fill exp Complex.{re=0.; im=3.0};
assert_equal exp got;
Array.delete store anode)
[[`ShardingIndexed cfg]; [`ShardingIndexed cfg2]];
Expand Down Expand Up @@ -163,12 +167,7 @@ let test_storage

let _ =
run_test_tt_main @@ ("Run Zarr sync API tests" >::: [

"test in-memory store" >::
(fun _ -> test_storage (module MemoryStore) @@ MemoryStore.create ())
;

"test filesystem store" >::
"test sync-based stores" >::
(fun _ ->
let rand_num = string_of_int @@ Random.int 1_000_000 in
let tmp_dir = Filename.(concat (get_temp_dir_name ()) (rand_num ^ ".zarr")) in
Expand Down Expand Up @@ -204,5 +203,6 @@ let _ =
(Zarr.Storage.Not_a_filesystem_store fn)
(fun () -> FilesystemStore.open_store fn);

test_storage (module MemoryStore) @@ MemoryStore.create ();
test_storage (module FilesystemStore) s)
])
Loading

0 comments on commit aa72566

Please sign in to comment.