diff --git a/examples/inmemory_zipstore.ml b/examples/inmemory_zipstore.ml index 566411e..089024a 100644 --- a/examples/inmemory_zipstore.ml +++ b/examples/inmemory_zipstore.ml @@ -92,23 +92,28 @@ end = struct then Deferred.return_unit else set t key value let rec set_partial_values t key ?(append=false) rv = - let f = - if append then - fun acc (_, v) -> - Deferred.return @@ acc ^ v - else - fun acc (rs, v) -> - let s = Bytes.unsafe_of_string acc in - String.(length v |> Bytes.blit_string v 0 s rs); - Deferred.return @@ Bytes.unsafe_to_string s - in let z = Atomic.get t.ic in - match Zipc.Member.kind (Option.get @@ Zipc.find key z) with + let mem = match Zipc.find key z with + | Some m -> m + | None -> + let empty = Result.fold + ~error:failwith ~ok:Fun.id @@ Zipc.File.stored_of_binary_string String.empty in + Result.fold + ~error:failwith ~ok:Fun.id @@ Zipc.Member.make ~path:key (Zipc.Member.File empty) + in + match Zipc.Member.kind mem with | Zipc.Member.Dir -> Deferred.return_unit | Zipc.Member.File file -> match Zipc.File.to_binary_string file with | Error e -> failwith e | Ok s -> + let f = if append || s = String.empty then + fun acc (_, v) -> Deferred.return @@ acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + String.(length v |> Bytes.blit_string v 0 s rs); + Deferred.return @@ Bytes.unsafe_to_string s + in let* value = Deferred.fold_left f s rv in match Zipc.File.deflate_of_binary_string ~level:t.level value with | Error e -> failwith e diff --git a/examples/picos_fs_store.ml b/examples/picos_fs_store.ml index 4fc9d7d..ba1ee07 100644 --- a/examples/picos_fs_store.ml +++ b/examples/picos_fs_store.ml @@ -33,10 +33,12 @@ end = struct PU.mkdir parent_dir perm let size t key = - let fd = PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm in - Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () -> - PU.set_nonblock fd; - PU.(fstat fd).st_size + match PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm with + | exception Unix.Unix_error (Unix.ENOENT, "open", _) -> 0 + | fd -> + Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () -> + PU.set_nonblock fd; + PU.(fstat fd).st_size let get t key = let fd = PU.openfile (key_to_fspath t key) [PU.O_RDONLY] t.perm in @@ -72,8 +74,14 @@ end = struct ignore @@ PU.write_substring fd v 0 (String.length v) let set_partial_values t key ?(append=false) rvs = - let flags = if append then PU.[O_APPEND; O_WRONLY] else [PU.O_WRONLY] in - let fd = PU.openfile (key_to_fspath t key) flags t.perm in + let flags = match append with + | false -> PU.[O_WRONLY; O_CREAT] + | true -> PU.[O_APPEND; O_WRONLY; O_CREAT] + in + let p = key_to_fspath t key in + create_parent_dir p t.perm; + let fd = PU.openfile p flags t.perm in + Fun.protect ~finally:(fun () -> PU.close fd) @@ fun () -> rvs |> List.iter @@ fun (ofs, v) -> if append then ignore @@ PU.lseek fd 0 PU.SEEK_END else ignore @@ PU.lseek fd ofs PU.SEEK_SET; @@ -113,8 +121,8 @@ end = struct Fun.protect ~finally:(fun () -> PU.closedir h) @@ fun () -> entries h [] |> List.partition_map @@ fun x -> match Filename.concat dir x with - | p when Sys.is_directory p -> Either.right @@ (fspath_to_key t p) ^ "/" - | p -> Either.left @@ fspath_to_key t p + | p when (PU.stat p).st_kind = PU.S_DIR -> Either.right @@ (fspath_to_key t p) ^ "/" + | p -> Either.left @@ fspath_to_key t p let rename t k k' = PU.rename (key_to_fspath t k) (key_to_fspath t k') end diff --git a/zarr-eio/src/storage.ml b/zarr-eio/src/storage.ml index aed67b3..5779d8f 100644 --- a/zarr-eio/src/storage.ml +++ b/zarr-eio/src/storage.ml @@ -17,56 +17,72 @@ module FilesystemStore = struct let key_to_fspath t key = Eio.Path.(t.root / key) let size t key = - Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow -> - Optint.Int63.to_int @@ Eio.File.size flow - - let get t key = match Eio.Path.load @@ key_to_fspath t key with - | exception Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) -> + try + Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow -> + Optint.Int63.to_int @@ Eio.File.size flow + with + | Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) -> 0 + + let get t key = + try Eio.Path.load @@ key_to_fspath t key with + | Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) -> raise @@ Zarr.Storage.Key_not_found key - | v -> v let get_partial_values t key ranges = + let add ~size a (s, l) = + let a' = Option.fold ~none:(a + size - s) ~some:(Int.add a) l in + a', (Optint.Int63.of_int s, a, a' - a) + in + let read ~flow ~buffer (file_offset, off, len) = + let _ = Eio.File.seek flow file_offset `Set in + let buf = Cstruct.of_bigarray ~off ~len buffer in + Eio.File.pread_exact flow ~file_offset [buf]; + Cstruct.to_string buf + in Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow -> let size = Optint.Int63.to_int @@ Eio.File.size flow in - let size', ranges' = - List.fold_left_map (fun a (s, l) -> - let a' = Option.fold ~none:(a + size - s) ~some:(Int.add a) l in - a', (Optint.Int63.of_int s, a, a' - a)) 0 ranges in + let size', ranges' = List.fold_left_map (add ~size) 0 ranges in let buffer = Bigarray.Array1.create Char C_layout size' in - ranges' |> Eio.Fiber.List.map @@ fun (fo, off, len) -> - let file_offset = Eio.File.seek flow fo `Set in - let buf = Cstruct.of_bigarray ~off ~len buffer in - Eio.File.pread_exact flow ~file_offset [buf]; - Cstruct.to_string buf + List.map (read ~flow ~buffer) ranges' - let set t key value = - let fp = key_to_fspath t key in + let create_parent_dir fp perm = Option.fold + ~some:(fun (p, _) -> Eio.Path.mkdirs ~exists_ok:true ~perm p) ~none:() - ~some:(fun (p, _) -> Eio.Path.mkdirs ~exists_ok:true ~perm:t.perm p) - (Eio.Path.split fp); + (Eio.Path.split fp) + + let set t key value = + let fp = key_to_fspath t key in + create_parent_dir fp t.perm; Eio.Path.save ~create:(`Or_truncate t.perm) fp value let set_partial_values t key ?(append=false) rvs = + let write = if append then + fun ~flow ~allocator (_, str) -> + Eio.File.pwrite_all flow ~file_offset:Optint.Int63.max_int [Cstruct.of_string ~allocator str] + else + fun ~flow ~allocator (ofs, str) -> + let file_offset = Eio.File.seek flow (Optint.Int63.of_int ofs) `Set in + Eio.File.pwrite_all flow ~file_offset [Cstruct.of_string ~allocator str] + in let l = List.fold_left (fun a (_, s) -> Int.max a (String.length s)) 0 rvs in let buffer = Bigarray.Array1.create Char C_layout l in let allocator len = Cstruct.of_bigarray ~off:0 ~len buffer in - Eio.Path.with_open_out ~append ~create:`Never (key_to_fspath t key) @@ fun flow -> - rvs |> Eio.Fiber.List.iter @@ fun (ofs, str) -> - let file_offset = Eio.File.seek flow (Optint.Int63.of_int ofs) `Set in - Eio.File.pwrite_all flow ~file_offset [Cstruct.of_string ~allocator str] + let fp = key_to_fspath t key in + create_parent_dir fp t.perm; + Eio.Path.with_open_out ~append ~create:(`If_missing t.perm) fp @@ fun flow -> + List.iter (write ~flow ~allocator) rvs let rec walk t acc dir = - List.fold_left - (fun a x -> - match Eio.Path.(dir / x) with - | p when Eio.Path.is_directory p -> walk t a p - | p -> (fspath_to_key t p) :: a) acc (Eio.Path.read_dir dir) + let add ~t ~dir a x = match Eio.Path.(dir / x) with + | p when Eio.Path.is_directory p -> walk t a p + | p -> (fspath_to_key t p) :: a + in + List.fold_left (add ~t ~dir) acc @@ Eio.Path.read_dir dir let list t = walk t [] t.root - let list_prefix t prefix = - walk t [] (key_to_fspath t prefix) + let list_prefix t prefix = walk t [] @@ key_to_fspath t prefix let is_member t key = Eio.Path.is_file @@ key_to_fspath t key @@ -82,13 +98,13 @@ module FilesystemStore = struct else Eio.Path.rmtree ~missing_ok:true prefix let list_dir t prefix = + let choose ~t ~dir x = match Eio.Path.(dir / x) with + | p when Eio.Path.is_directory p -> + Either.right @@ (fspath_to_key t p) ^ "/" + | p -> Either.left @@ fspath_to_key t p + in let dir = key_to_fspath t prefix in - List.partition_map - (fun x -> - match Eio.Path.(dir / x) with - | p when Eio.Path.is_directory p -> - Either.right @@ (fspath_to_key t p) ^ "/" - | p -> Either.left @@ fspath_to_key t p) (Eio.Path.read_dir dir) + List.partition_map (choose ~t ~dir) @@ Eio.Path.read_dir dir let rename t k k' = Eio.Path.rename (key_to_fspath t k) (key_to_fspath t k') diff --git a/zarr-eio/test/test_eio.ml b/zarr-eio/test/test_eio.ml index 99cf5a4..33f1968 100644 --- a/zarr-eio/test/test_eio.ml +++ b/zarr-eio/test/test_eio.ml @@ -65,7 +65,10 @@ let test_storage Ndarray.fill exp Complex.{re=0.; im=3.0}; Array.write store anode slice exp; let got = Array.read store anode slice Complex32 in - assert_equal exp got) + assert_equal exp got; + match codecs with + | [`ShardingIndexed _] -> Array.delete store anode + | _ -> Deferred.return_unit) [[`ShardingIndexed cfg]; [`Bytes BE]]; let child = Node.Group.of_path "/some/child/group" in diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 1f32bd7..0c95293 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -26,7 +26,9 @@ module FilesystemStore = struct | true -> Lwt.return_unit let size t key = - Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int + Lwt.catch + (fun () -> Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int) + (fun _ -> Deferred.return 0) let get t key = let* bsize = size t key in @@ -76,17 +78,18 @@ module FilesystemStore = struct let set_partial_values t key ?(append=false) rvs = let l = List.fold_left (fun a (_, s) -> Int.max a (String.length s)) 0 rvs in + let flags = match append with + | false -> Unix.[O_WRONLY; O_CREAT] + | true -> Unix.[O_APPEND; O_WRONLY; O_CREAT] + in + let fp = key_to_fspath t key in + let* () = create_parent_dir fp t.perm in Lwt_io.with_file - ~buffer:(Lwt_bytes.create l) - ~flags:(if append then Unix.[O_APPEND; O_WRONLY] else [Unix.O_WRONLY]) - ~perm:t.perm - ~mode:Lwt_io.Output - (key_to_fspath t key) - @@ fun oc -> - Lwt_list.iter_s - (fun (ofs, value) -> - let* () = Lwt_io.set_position oc @@ Int64.of_int ofs in - Lwt_io.write oc value) rvs + ~buffer:(Lwt_bytes.create l) ~flags ~perm:t.perm ~mode:Lwt_io.Output fp @@ fun oc -> + Lwt_list.iter_s + (fun (ofs, value) -> + let* () = Lwt_io.set_position oc @@ Int64.of_int ofs in + Lwt_io.write oc value) rvs let rec walk t acc dir = Lwt_stream.fold_s diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index 164d1ea..1fa91b1 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -65,8 +65,11 @@ let test_storage assert_equal exp arr; Ndarray.fill exp Complex.{re=0.; im=3.0}; Array.write store anode slice exp >>= fun () -> - Array.read store anode slice Complex32 >>| fun got -> - assert_equal exp got) + Array.read store anode slice Complex32 >>= fun got -> + assert_equal exp got; + match codecs with + | [`ShardingIndexed _] -> Array.delete store anode + | _ -> Deferred.return_unit) [[`ShardingIndexed cfg]; [`Bytes BE]] >>= fun () -> let child = Node.Group.of_path "/some/child/group" in diff --git a/zarr-sync/src/storage.ml b/zarr-sync/src/storage.ml index e960368..2604413 100644 --- a/zarr-sync/src/storage.ml +++ b/zarr-sync/src/storage.ml @@ -25,8 +25,7 @@ module FilesystemStore = struct let s = In_channel.length ic |> Int64.to_int in ranges |> List.map @@ fun (ofs, len) -> In_channel.seek ic @@ Int64.of_int ofs; - let l = Option.fold ~none:(s - ofs) ~some:Fun.id len in - Option.get @@ In_channel.really_input_string ic l + really_input_string ic @@ Option.fold ~none:(s - ofs) ~some:Fun.id len let set t key v = let p = key_to_fspath t key in @@ -35,12 +34,17 @@ module FilesystemStore = struct Out_channel.(with_open_gen f t.perm p @@ fun oc -> output_string oc v; flush oc) let set_partial_values t key ?(append=false) rvs = - let f = [if append then Open_append else Open_wronly] in let p = key_to_fspath t key in - Out_channel.with_open_gen f t.perm p @@ fun oc -> - rvs |> List.iter (fun (rs, value) -> - Out_channel.seek oc @@ Int64.of_int rs; - Out_channel.output_string oc value); + Zarr.Util.create_parent_dir p t.perm; + let flags = match append with + | false -> [Open_creat; Open_wronly] + | true -> [Open_append; Open_creat; Open_wronly] + in + Out_channel.with_open_gen flags t.perm p @@ fun oc -> + List.iter + (fun (rs, value) -> + Out_channel.seek oc @@ Int64.of_int rs; + Out_channel.output_string oc value) rvs; Out_channel.flush oc let is_member t key = Sys.file_exists @@ key_to_fspath t key @@ -48,8 +52,9 @@ module FilesystemStore = struct let erase t key = Sys.remove @@ key_to_fspath t key let size t key = - In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length) - |> Int64.to_int + match In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length) with + | exception Sys_error _ -> 0 + | s -> Int64.to_int s let rec walk t acc dir = List.fold_left diff --git a/zarr-sync/test/test_sync.ml b/zarr-sync/test/test_sync.ml index a1b2635..28eaf4b 100644 --- a/zarr-sync/test/test_sync.ml +++ b/zarr-sync/test/test_sync.ml @@ -53,6 +53,7 @@ let test_storage ;codecs = [`Bytes LE]} in let anode = Node.Array.(gnode / "arrnode") in let slice = [|R [|0; 20|]; I 10; R [|0; 29|]|] in + let bigger_slice = [|R [|0; 21|]; L [|9; 10|] ; R [|0; 30|]|] in List.iter (fun codecs -> @@ -65,10 +66,13 @@ let test_storage Ndarray.fill exp Complex.{re=2.0; im=0.}; Array.write store anode slice exp; let got = Array.read store anode slice Complex32 in + (* test if a bigger slice containing new elements can be read from store *) + let _ = Array.read store anode bigger_slice Complex32 in assert_equal exp got; - Ndarray.fill exp Complex.{re=0.; im=3.0}; - Array.write store anode slice exp; + (* test writing a bigger slice to store *) + Array.write store anode bigger_slice @@ Ndarray.init Complex32 [|22; 2; 31|] (Fun.const Complex.{re=0.; im=3.0}); let got = Array.read store anode slice Complex32 in + Ndarray.fill exp Complex.{re=0.; im=3.0}; assert_equal exp got; Array.delete store anode) [[`ShardingIndexed cfg]; [`ShardingIndexed cfg2]]; @@ -163,12 +167,7 @@ let test_storage let _ = run_test_tt_main @@ ("Run Zarr sync API tests" >::: [ - - "test in-memory store" >:: - (fun _ -> test_storage (module MemoryStore) @@ MemoryStore.create ()) - ; - - "test filesystem store" >:: + "test sync-based stores" >:: (fun _ -> let rand_num = string_of_int @@ Random.int 1_000_000 in let tmp_dir = Filename.(concat (get_temp_dir_name ()) (rand_num ^ ".zarr")) in @@ -204,5 +203,6 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store fn); + test_storage (module MemoryStore) @@ MemoryStore.create (); test_storage (module FilesystemStore) s) ]) diff --git a/zarr/src/codecs/array_to_bytes.ml b/zarr/src/codecs/array_to_bytes.ml index f71163e..f43df13 100644 --- a/zarr/src/codecs/array_to_bytes.ml +++ b/zarr/src/codecs/array_to_bytes.ml @@ -213,8 +213,8 @@ end = struct in let ib = encode_index_chain t.index_codecs shard_idx in match t.index_location with - | Start -> String.concat "" @@ ib :: List.rev xs - | End -> String.concat "" @@ List.rev @@ ib :: xs + | Start -> String.concat String.empty @@ ib :: List.rev xs + | End -> String.concat String.empty @@ List.rev @@ ib :: xs let decode_chain t repr x = let shape = List.fold_left ArrayToArray.encoded_repr repr.shape t.a2a in @@ -387,96 +387,158 @@ module Make (Io : Types.IO) = struct open Io open Deferred.Syntax open ShardingIndexedCodec - type t = ShardingIndexedCodec.t + type t = ShardingIndexedCodec.t type set_fn = ?append:bool -> (int * string) list -> unit Deferred.t - let partial_encode t get_partial (set_partial : set_fn) csize repr pairs = + let add_binding ~grid acc (c, v) = + let id, co = RegularGrid.index_coord_pair grid c in + ArrayMap.add_to_list id (co, v) acc + + (* specialized function for partially writing possibly multiple inner chunks + to an empty shard of a designated array using the sharding indexed codec.*) + let partial_encode_empty_shard t (set_partial : set_fn) repr pairs fill_value = + let update_index ~index i z (ofs, acc) = + let arr = Ndarray.create repr.kind t.chunk_shape fill_value in + List.iter (fun (c, v) -> Ndarray.set arr c v) z; + let s = encode_chain t.codecs arr in + let n = String.length s in + Ndarray.set index (Array.append i [|0|]) @@ Stdint.Uint64.of_int ofs; + Ndarray.set index (Array.append i [|1|]) @@ Stdint.Uint64.of_int n; + ofs + n, (ofs, s) :: acc + in + (* simulate the inner chunks of a shard as a regular grid of specified shape.*) + let grid = RegularGrid.create ~array_shape:repr.shape t.chunk_shape in + (* build a finite map with its keys being an inner chunk's index and values + being a list of (coord-within-inner-chunk, new-value) pairs such that + new-value is set for the coordinate coord-within-inner-chunk of the inner + chunk represented by the associated key.*) + let m = List.fold_left (add_binding ~grid) ArrayMap.empty pairs in + let cps = Array.map2 (/) repr.shape t.chunk_shape in + let init = match t.index_location with + | Start -> index_size t.index_codecs cps + | End -> 0 + in + let index = Ndarray.create Uint64 (Array.append cps [|2|]) Stdint.Uint64.max_int in + let shard_size, ranges = ArrayMap.fold (update_index ~index) m (init, []) in + let indexbytes = encode_index_chain t.index_codecs index in + (* write all resultant (offset, bytes) pairs into the bytes of the new shard + taking note to append/prepend the bytes of the shard's index array.*) + match t.index_location with + | Start -> set_partial @@ (0, indexbytes) :: List.rev ranges + | End -> set_partial @@ List.rev @@ (shard_size, indexbytes) :: ranges + + (* function to partially write new values to one or more inner chunks of + an existing shard using the sharding indexed codec. *) + let partial_encode t get_partial (set_partial : set_fn) shardsize repr pairs fv = + if shardsize = 0 then partial_encode_empty_shard t set_partial repr pairs fv else let cps = Array.map2 (/) repr.shape t.chunk_shape in let is = index_size t.index_codecs cps in - let l, pad = - match t.index_location with - | Start -> get_partial [0, Some is], is - | End -> get_partial [csize - is, None], 0 in - let* xs = l in - let idx_arr = fst @@ decode_index t cps @@ List.hd xs in - let grid = RegularGrid.create ~array_shape:repr.shape t.chunk_shape in - let m = - List.fold_left - (fun acc (c, v) -> - let id, co = RegularGrid.index_coord_pair grid c in - ArrayMap.add_to_list id (co, v) acc) ArrayMap.empty pairs + let* l = match t.index_location with + | Start -> get_partial [0, Some is] + | End -> get_partial [shardsize - is, None] in - let bindings = ArrayMap.bindings m in - let ranges, coords = - List.split @@ - List.map - (fun (i, _) -> - let oc = Array.append i [|0|] in - let nc = Array.append i [|1|] in - let ofs = Stdint.Uint64.to_int @@ Ndarray.get idx_arr oc in - let nb = Stdint.Uint64.to_int @@ Ndarray.get idx_arr nc in - (pad + ofs, Some nb), (oc, nc, ofs, nb)) bindings + let idx_arr = fst @@ decode_index t cps @@ List.hd l in + let grid = RegularGrid.create ~array_shape:repr.shape t.chunk_shape in + let m = List.fold_left (add_binding ~grid) ArrayMap.empty pairs in + (* split the finite map m into key-value pairs representing empty inner chunks + and those that don't (using the fact that empty inner chunks have index + array values equal to 2^64 - 1; then process these seperately.*) + let empty, nonempty = List.partition_map + (fun ((i, _) as bd) -> + let oc = Array.append i [|0|] and nc = Array.append i [|1|] in + match Ndarray.(get idx_arr oc, get idx_arr nc) with + | o, n when Stdint.Uint64.(max_int = o && max_int = n) -> + Either.Left ((-1, None), (oc, nc, -1, 0, bd)) + | o, n -> + let o', n' = Stdint.Uint64.(to_int o, to_int n) in + Either.Right ((o', Some n'), (oc, nc, o', n', bd))) ArrayMap.(bindings m) in + let ranges, nonempty' = List.split nonempty in + (* fold over the nonempty index coordinates and finite map to obtain + (offset, bytes) pairs to write in-place and those to append at the + end of the shard. bytes to write in-place are determined by comparing + encoded size vs the corresponding nbytes[i] value already contained in + the shard's index array.*) let* xs = get_partial ranges in let repr' = {repr with shape = t.chunk_shape} in - let bsize, inplace, append = - List.fold_left - (fun (a, l, r) ((x, (oc, nc, ofs, nb)), (_, z)) -> - let arr = decode_chain t.codecs repr' x in - List.iter (fun (c, v) -> Ndarray.set arr c v) z; - let s = encode_chain t.codecs arr in - let nb' = String.length s in - if nb' = nb then a, (pad + ofs, s) :: l, r - else - (Ndarray.set idx_arr oc @@ Stdint.Uint64.of_int a; - Ndarray.set idx_arr nc @@ Stdint.Uint64.of_int nb'; - a + nb', l, (a, s) :: r)) - (csize - pad, [], []) List.(combine (combine xs coords) bindings) + let bsize, inplace, nonempty_append = List.fold_left2 + (fun (acc, l, r) x (oc, nc, ofs, nb, (_, z)) -> + let arr = decode_chain t.codecs repr' x in + List.iter (fun (c, v) -> Ndarray.set arr c v) z; + let s = encode_chain t.codecs arr in + let nb' = String.length s in + if nb' = nb then acc, (ofs, s) :: l, r else begin + Ndarray.set idx_arr oc @@ Stdint.Uint64.of_int acc; + Ndarray.set idx_arr nc @@ Stdint.Uint64.of_int nb'; + acc + nb', l, (acc, s) :: r end) (shardsize, [], []) xs nonempty' in let* () = match inplace with | [] -> Deferred.return_unit - | xs -> set_partial xs + | rs -> set_partial rs in - let* () = match append with + let* () = match nonempty_append with | [] -> Deferred.return_unit - | xs -> set_partial ~append:true xs + | rs -> set_partial ~append:true @@ List.rev rs + in + (* new values that need to be written to previously empty inner chunks will + be appended at the end of the shard and the corresponding index array's + offset and number-of-bytes values updated accordingly.*) + let bsize', empty_append = List.fold_left + (fun (ofs, l) (_, (oc, nc, _, _, (_, z))) -> + let arr = Ndarray.create repr'.kind repr'.shape fv in + List.iter (fun (c, v) -> Ndarray.set arr c v) z; + let s = encode_chain t.codecs arr in + let n = String.length s in + Ndarray.set idx_arr oc @@ Stdint.Uint64.of_int ofs; + Ndarray.set idx_arr nc @@ Stdint.Uint64.of_int n; + ofs + n, (ofs, s) :: l) (bsize, []) empty + in + let* () = match empty_append with + | [] -> Deferred.return_unit + | rs -> set_partial ~append:true @@ List.rev rs in let ib = encode_index_chain t.index_codecs idx_arr in match t.index_location with | Start -> set_partial [0, ib] - | End -> set_partial ~append:true [bsize, ib] + | End -> set_partial ~append:true [bsize', ib] - let partial_decode t get_partial csize repr pairs = + (* function to partially read values off of a non-empty shard previously + encoded using the sharding indexed codec. *) + let partial_decode t get_partial shardsize repr pairs fill_value = + let grid = RegularGrid.create ~array_shape:repr.shape t.chunk_shape in + let add_binding ~grid acc (i, y) = + let id, c = RegularGrid.index_coord_pair grid y in + ArrayMap.add_to_list id (i, c) acc + in + let m = List.fold_left (add_binding ~grid) ArrayMap.empty pairs in let cps = Array.map2 (/) repr.shape t.chunk_shape in let is = index_size t.index_codecs cps in - let l, pad = - match t.index_location with - | Start -> get_partial [0, Some is], is - | End -> get_partial [csize - is, None], 0 in - let* xs = l in - let idx_arr = fst @@ decode_index t cps @@ List.hd xs in - let grid = RegularGrid.create ~array_shape:repr.shape t.chunk_shape in - let m = - List.fold_left - (fun acc (i, y) -> - let id, c = RegularGrid.index_coord_pair grid y in - ArrayMap.add_to_list id (i, c) acc) ArrayMap.empty pairs + let* l = match t.index_location with + | Start -> get_partial [0, Some is] + | End -> get_partial [shardsize - is, None] in - let ranges = - List.map - (fun (i, _) -> - let oc = Array.append i [|0|] in - let nc = Array.append i [|1|] in - let ofs = Stdint.Uint64.to_int @@ Ndarray.get idx_arr oc in - let nb = Stdint.Uint64.to_int @@ Ndarray.get idx_arr nc in - pad + ofs, Some nb) ArrayMap.(bindings m) + let index = fst @@ decode_index t cps @@ List.hd l in + let empty, nonempty = List.partition_map + (fun ((i, _) as bd) -> + match Ndarray.(get index @@ Array.append i [|0|], + get index @@ Array.append i [|1|]) with + | o, n when Stdint.Uint64.(max_int = o && max_int = n) -> + Either.Left ((-1, None), bd) + | o, n -> + let o', n' = Stdint.Uint64.(to_int o, to_int n) in + Either.Right ((o', Some n'), bd)) ArrayMap.(bindings m) in + let ranges, bindings = List.split nonempty in let+ xs = get_partial ranges in let repr' = {repr with shape = t.chunk_shape} in - List.concat_map - (fun (x, (_, z)) -> + let res1 = List.concat @@ List.map2 + (fun x (_, z) -> let arr = decode_chain t.codecs repr' x in - List.map (fun (i, c) -> i, Ndarray.get arr c) z) - List.(combine xs @@ ArrayMap.bindings m) + List.map (fun (i, c) -> i, Ndarray.get arr c) z) xs bindings + in + let res2 = List.concat_map + (fun (_, (_, z)) -> List.map (fun (i, _) -> i, fill_value) z) empty + in + res1 @ res2 end diff --git a/zarr/src/codecs/array_to_bytes.mli b/zarr/src/codecs/array_to_bytes.mli index 476586a..05db801 100644 --- a/zarr/src/codecs/array_to_bytes.mli +++ b/zarr/src/codecs/array_to_bytes.mli @@ -20,6 +20,7 @@ module Make (Io : Types.IO) : sig int -> 'a array_repr -> (int array * 'a) list -> + 'a -> unit Deferred.t val partial_decode : t -> @@ -27,5 +28,6 @@ module Make (Io : Types.IO) : sig int -> 'a array_repr -> (int * int array) list -> + 'a -> (int * 'a) list Deferred.t end diff --git a/zarr/src/codecs/codecs.ml b/zarr/src/codecs/codecs.ml index cfd8202..6ae3d1c 100644 --- a/zarr/src/codecs/codecs.ml +++ b/zarr/src/codecs/codecs.ml @@ -133,15 +133,15 @@ module Make (Io : Types.IO) = struct | {a2a = []; a2b = `ShardingIndexed _; b2b = []} -> true | _ -> false - let partial_encode t f g bsize repr pairs = + let partial_encode t f g bsize repr pairs fv = match t.a2b with | `ShardingIndexed c -> - ShardingIndexedCodec.partial_encode c f g bsize repr pairs + ShardingIndexedCodec.partial_encode c f g bsize repr pairs fv | `Bytes _ -> failwith "bytes codec does not support partial encoding." - let partial_decode t f s repr pairs = + let partial_decode t f s repr pairs fv = match t.a2b with | `ShardingIndexed c -> - ShardingIndexedCodec.partial_decode c f s repr pairs + ShardingIndexedCodec.partial_decode c f s repr pairs fv | `Bytes _ -> failwith "bytes codec does not support partial decoding." end diff --git a/zarr/src/codecs/codecs.mli b/zarr/src/codecs/codecs.mli index 6e91d9c..55ee3cb 100644 --- a/zarr/src/codecs/codecs.mli +++ b/zarr/src/codecs/codecs.mli @@ -59,6 +59,7 @@ module Make (Io : Types.IO) : sig int -> 'a array_repr -> (int array * 'a) list -> + 'a -> unit Io.Deferred.t val partial_decode : @@ -67,5 +68,6 @@ module Make (Io : Types.IO) : sig int -> 'a array_repr -> (int * int array) list -> + 'a -> (int * 'a) list Io.Deferred.t end diff --git a/zarr/src/storage/memory.ml b/zarr/src/storage/memory.ml index b19cb24..a1d3ca7 100644 --- a/zarr/src/storage/memory.ml +++ b/zarr/src/storage/memory.ml @@ -30,7 +30,8 @@ module Make (Deferred : Types.Deferred) = struct then Deferred.return_unit else erase t key let size t key = - Deferred.return @@ String.length @@ M.find key @@ Atomic.get t + Deferred.return @@ + Option.fold ~none:0 ~some:String.length (M.find_opt key @@ Atomic.get t) let rec erase_prefix t prefix = let pred ~prefix k v = if String.starts_with ~prefix k then None else Some v in @@ -48,17 +49,18 @@ module Make (Deferred : Types.Deferred) = struct List.fold_left (add ~value ~size) [] (List.rev ranges) let rec set_partial_values t key ?(append=false) rv = - let f = if append then fun acc (_, v) -> acc ^ v else + let m = Atomic.get (t : t) in + let ov = Option.fold ~none:String.empty ~some:Fun.id @@ M.find_opt key m in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else fun acc (rs, v) -> let s = Bytes.of_string acc in - String.(length v |> Bytes.blit_string v 0 s rs); - Bytes.to_string s in - let m = Atomic.get (t : t) in - let ov = M.find key m in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.to_string s + in let m' = M.add key (List.fold_left f ov rv) m in - let success = Atomic.compare_and_set t m m' in - if not success then set_partial_values t key ~append rv - else Deferred.return_unit + if Atomic.compare_and_set t m m' + then Deferred.return_unit else set_partial_values t key ~append rv let list_dir t prefix = let module S = Set.Make(String) in diff --git a/zarr/src/storage/storage.ml b/zarr/src/storage/storage.ml index 5c840f5..f665aef 100644 --- a/zarr/src/storage/storage.ml +++ b/zarr/src/storage/storage.ml @@ -9,7 +9,7 @@ module ArraySet = Set.Make (struct end) module Make (Io : Types.IO) = struct - module PartialChain = Codecs.Make(Io) + module Io_chain = Codecs.Make(Io) module Deferred = Io.Deferred open Io @@ -113,29 +113,31 @@ module Make (Io : Types.IO) = struct ArrayMap.empty @@ Array.combine (Indexing.coords_of_slice slice shape) (Ndarray.to_array x) in - let fv = Metadata.Array.fillvalue_of_kind meta kind in + let fill_value = Metadata.Array.fillvalue_of_kind meta kind in let repr = Codecs.{kind; shape = Metadata.Array.chunk_shape meta} in let prefix = Node.Array.to_key node ^ "/" in let chain = Metadata.Array.codecs meta in (* NOTE: there is opportunity to compute this step in parallel since each iteration acts on independent chunks. Maybe use Domainslib? *) - ArrayMap.bindings m |> Deferred.iter @@ fun (idx, pairs) -> - let ckey = prefix ^ Metadata.Array.chunk_key meta idx in - is_member t ckey >>= function - | true when PartialChain.is_just_sharding chain -> - let* csize = size t ckey in - let get_p = get_partial_values t ckey in - let set_p = set_partial_values t ckey in - PartialChain.partial_encode chain get_p set_p csize repr pairs - | true -> - let* v = get t ckey in - let arr = Codecs.Chain.decode chain repr v in - List.iter (fun (c, v) -> Ndarray.set arr c v) pairs; - set t ckey @@ Codecs.Chain.encode chain arr - | false -> - let arr = Ndarray.create repr.kind repr.shape fv in - List.iter (fun (c, v) -> Ndarray.set arr c v) pairs; - set t ckey @@ Codecs.Chain.encode chain arr + let update_chunk (idx, pairs) = + let ckey = prefix ^ Metadata.Array.chunk_key meta idx in + if Io_chain.is_just_sharding chain then + let* shardsize = size t ckey in + let pget = get_partial_values t ckey in + let pset = set_partial_values t ckey in + Io_chain.partial_encode chain pget pset shardsize repr pairs fill_value + else is_member t ckey >>= function + | true -> + let* v = get t ckey in + let arr = Codecs.Chain.decode chain repr v in + List.iter (fun (c, v) -> Ndarray.set arr c v) pairs; + set t ckey @@ Codecs.Chain.encode chain arr + | false -> + let arr = Ndarray.create repr.kind repr.shape fill_value in + List.iter (fun (c, v) -> Ndarray.set arr c v) pairs; + set t ckey @@ Codecs.Chain.encode chain arr + in + Deferred.iter update_chunk ArrayMap.(bindings m) let read : type a. t -> @@ -164,20 +166,20 @@ module Make (Io : Types.IO) = struct let repr = Codecs.{kind; shape = Metadata.Array.chunk_shape meta} in (* NOTE: there is opportunity to compute this step in parallel since each iteration acts on independent chunks. *) - let+ pairs = ArrayMap.bindings m |> Deferred.concat_map @@ fun (idx, pairs) -> + (* `pairs` argument is a list of (C-order index of value within slice, coordinate within chunk) pairs.*) + let read_chunk_data (idx, pairs) = let ckey = prefix ^ Metadata.Array.chunk_key meta idx in - is_member t ckey >>= function - | true when PartialChain.is_just_sharding chain -> - let get_p = get_partial_values t ckey in - let* csize = size t ckey in - PartialChain.partial_decode chain get_p csize repr pairs - | true -> + size t ckey >>= function + | 0 -> Deferred.return @@ List.map (fun (i, _) -> i, fill_value) pairs + | shardsize when Io_chain.is_just_sharding chain -> + let pget = get_partial_values t ckey in + Io_chain.partial_decode chain pget shardsize repr pairs fill_value + | _ -> let+ v = get t ckey in let arr = Codecs.Chain.decode chain repr v in List.map (fun (i, c) -> i, Ndarray.get arr c) pairs - | false -> - Deferred.return @@ List.map (fun (i, _) -> i, fill_value) pairs in + let+ pairs = Deferred.concat_map read_chunk_data ArrayMap.(bindings m) in (* sorting restores the C-order of the decoded array coordinates. *) let v = Array.of_list @@ snd @@ List.split @@