diff --git a/include/abstract_data_store.h b/include/abstract_data_store.h index d858c8eef..e0c9a99e8 100644 --- a/include/abstract_data_store.h +++ b/include/abstract_data_store.h @@ -9,6 +9,8 @@ #include "types.h" #include "windows_customizations.h" #include "distance.h" +#include "aligned_file_reader.h" + namespace diskann { @@ -21,13 +23,15 @@ template class AbstractDataStore virtual ~AbstractDataStore() = default; // Return number of points returned - virtual location_t load(const std::string &filename) = 0; + virtual location_t load(const std::string &filename, size_t offset) = 0; + virtual location_t load(AlignedFileReader &reader, size_t offset) = 0; // Why does store take num_pts? Since store only has capacity, but we allow // resizing we can end up in a situation where the store has spare capacity. // To optimize disk utilization, we pass the number of points that are "true" // points, so that the store can discard the empty locations before saving. virtual size_t save(const std::string &filename, const location_t num_pts) = 0; + virtual size_t save(std::ofstream &writer, const location_t num_pts, size_t offset) = 0; DISKANN_DLLEXPORT virtual location_t capacity() const; diff --git a/include/abstract_graph_store.h b/include/abstract_graph_store.h index 4d6906ca4..5c239da7e 100644 --- a/include/abstract_graph_store.h +++ b/include/abstract_graph_store.h @@ -7,6 +7,8 @@ #include #include "types.h" +class AlignedFileReader; + namespace diskann { @@ -21,11 +23,20 @@ class AbstractGraphStore virtual ~AbstractGraphStore() = default; // returns tuple of - virtual std::tuple load(const std::string &index_path_prefix, - const size_t num_points) = 0; +#ifdef EXEC_ENV_OLS + virtual std::tuple load(AlignedFileReader &reader, const size_t num_points, + size_t offset) = 0; +#else + virtual std::tuple load(const std::string &index_path_prefix, const size_t num_points, + size_t offset) = 0; +#endif + virtual int store(const std::string &index_path_prefix, const size_t num_points, const size_t num_fz_points, const uint32_t start) = 0; + virtual int store(std::ofstream &writer, const size_t num_points, const size_t num_fz_points, const uint32_t start, + size_t offset) = 0; + // not synchronised, user should use lock when necvessary. virtual const std::vector &get_neighbours(const location_t i) const = 0; virtual void add_neighbour(const location_t i, location_t neighbour_id) = 0; diff --git a/include/aligned_file_reader.h b/include/aligned_file_reader.h index f5e2af5c3..f39d5da39 100644 --- a/include/aligned_file_reader.h +++ b/include/aligned_file_reader.h @@ -117,4 +117,9 @@ class AlignedFileReader // process batch of aligned requests in parallel // NOTE :: blocking call virtual void read(std::vector &read_reqs, IOContext &ctx, bool async = false) = 0; + +#ifdef USE_BING_INFRA + // wait for completion of one request in a batch of requests + virtual void wait(IOContext &ctx, int &completedIndex) = 0; +#endif }; diff --git a/include/defaults.h b/include/defaults.h index 5ea5af495..ef1750fcf 100644 --- a/include/defaults.h +++ b/include/defaults.h @@ -17,7 +17,7 @@ const uint32_t NUM_FROZEN_POINTS_STATIC = 0; const uint32_t NUM_FROZEN_POINTS_DYNAMIC = 1; // In-mem index related limits -const float GRAPH_SLACK_FACTOR = 1.3; +const float GRAPH_SLACK_FACTOR = 1.3f; // SSD Index related limits const uint64_t MAX_GRAPH_DEGREE = 512; diff --git a/include/distance.h b/include/distance.h index 8b20e586b..065b38231 100644 --- a/include/distance.h +++ b/include/distance.h @@ -1,5 +1,6 @@ #pragma once #include "windows_customizations.h" +#include #include namespace diskann diff --git a/include/in_mem_data_store.h b/include/in_mem_data_store.h index 9b6968b03..6feb09199 100644 --- a/include/in_mem_data_store.h +++ b/include/in_mem_data_store.h @@ -24,8 +24,10 @@ template class InMemDataStore : public AbstractDataStore> distance_fn); virtual ~InMemDataStore(); - virtual location_t load(const std::string &filename) override; - virtual size_t save(const std::string &filename, const location_t num_points) override; + virtual location_t load(const std::string &filename, size_t offset = 0) override; + virtual location_t load(AlignedFileReader &reader, size_t offset = 0) override; + virtual size_t save(const std::string &filename, const location_t num_pts) override; + virtual size_t save(std::ofstream &writer, const location_t num_pts, size_t offset) override; virtual size_t get_aligned_dim() const override; @@ -59,9 +61,9 @@ template class InMemDataStore : public AbstractDataStore - virtual std::tuple load(const std::string &index_path_prefix, - const size_t num_points) override; +#ifdef EXEC_ENV_OLS + virtual std::tuple load(AlignedFileReader &reader, const size_t num_points, + size_t offset) override; +#else + virtual std::tuple load(const std::string &filename, size_t expected_num_points, + size_t offset); +#endif virtual int store(const std::string &index_path_prefix, const size_t num_points, const size_t num_frozen_points, const uint32_t start) override; - + virtual int store(std::ofstream &writer, const size_t num_points, const size_t num_fz_points, const uint32_t start, + size_t offset) override; virtual const std::vector &get_neighbours(const location_t i) const override; virtual void add_neighbour(const location_t i, location_t neighbour_id) override; virtual void clear_neighbours(const location_t i) override; @@ -33,13 +39,16 @@ class InMemGraphStore : public AbstractGraphStore virtual uint32_t get_max_observed_degree() override; protected: - virtual std::tuple load_impl(const std::string &filename, size_t expected_num_points); #ifdef EXEC_ENV_OLS - virtual std::tuple load_impl(AlignedFileReader &reader, size_t expected_num_points); + virtual std::tuple load_impl(AlignedFileReader &reader, size_t expected_num_points, + size_t offset); +#else + virtual std::tuple load_impl(const std::string &filename, size_t expected_num_points, + size_t offset); #endif - int save_graph(const std::string &index_path_prefix, const size_t active_points, const size_t num_frozen_points, - const uint32_t start); + int save_graph(std::ofstream &writer, const size_t active_points, const size_t num_frozen_points, + const uint32_t start, size_t offset); private: size_t _max_range_of_graph = 0; diff --git a/include/index.h b/include/index.h index e7966461c..60c218776 100644 --- a/include/index.h +++ b/include/index.h @@ -28,6 +28,16 @@ namespace diskann { +// This struct is used for storing metadata for save_as_one_file version 1. +struct SaveLoadMetaDataV1 +{ + uint64_t data_offset; + uint64_t delete_list_offset; + uint64_t tags_offset; + uint64_t graph_offset; + + SaveLoadMetaDataV1(); +}; inline double estimate_ram_usage(size_t size, uint32_t dim, uint32_t datasize, uint32_t degree) { @@ -57,7 +67,9 @@ template clas const size_t num_frozen_pts = 0, const bool dynamic_index = false, const bool enable_tags = false, const bool concurrent_consolidate = false, const bool pq_dist_build = false, const size_t num_pq_chunks = 0, - const bool use_opq = false, const bool filtered_index = false); + const bool use_opq = false, const bool filtered_index = false, + bool save_as_one_file = false, uint64_t save_as_one_file_version = 1, + bool load_from_one_file = false, uint64_t load_from_one_file_version = 1); DISKANN_DLLEXPORT Index(const IndexConfig &index_config, std::unique_ptr> data_store, std::unique_ptr graph_store); @@ -80,6 +92,7 @@ template clas // get some private variables DISKANN_DLLEXPORT size_t get_num_points(); DISKANN_DLLEXPORT size_t get_max_points(); + DISKANN_DLLEXPORT size_t get_num_deleted_points(); DISKANN_DLLEXPORT bool detect_common_filters(uint32_t point_id, bool search_invocation, const std::vector &incoming_labels); @@ -294,7 +307,7 @@ template clas // Renumber nodes, update tag and location maps and compact the // graph, mode = _consolidated_order in case of lazy deletion and // _compacted_order in case of eager deletion - DISKANN_DLLEXPORT void compact_data(); + DISKANN_DLLEXPORT void compact_data(bool forced = false); DISKANN_DLLEXPORT void compact_frozen_point(); // Remove deleted nodes from adjacency list of node loc @@ -313,15 +326,15 @@ template clas DISKANN_DLLEXPORT size_t save_tags(std::string filename); DISKANN_DLLEXPORT size_t save_delete_list(const std::string &filename); #ifdef EXEC_ENV_OLS - DISKANN_DLLEXPORT size_t load_graph(AlignedFileReader &reader, size_t expected_num_points); - DISKANN_DLLEXPORT size_t load_data(AlignedFileReader &reader); - DISKANN_DLLEXPORT size_t load_tags(AlignedFileReader &reader); - DISKANN_DLLEXPORT size_t load_delete_set(AlignedFileReader &reader); + DISKANN_DLLEXPORT size_t load_graph(AlignedFileReader &reader, size_t expected_num_points, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_data(AlignedFileReader &reader, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_tags(AlignedFileReader &reader, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_delete_set(AlignedFileReader &reader, size_t offset = 0); #else - DISKANN_DLLEXPORT size_t load_graph(const std::string filename, size_t expected_num_points); - DISKANN_DLLEXPORT size_t load_data(std::string filename0); - DISKANN_DLLEXPORT size_t load_tags(const std::string tag_file_name); - DISKANN_DLLEXPORT size_t load_delete_set(const std::string &filename); + DISKANN_DLLEXPORT size_t load_graph(const std::string filename, size_t expected_num_points, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_data(std::string filename, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_tags(const std::string &filename, size_t offset = 0); + DISKANN_DLLEXPORT size_t load_delete_set(const std::string &filename, size_t offset = 0); #endif private: @@ -360,7 +373,10 @@ template clas bool _has_built = false; bool _saturate_graph = false; - bool _save_as_one_file = false; // plan to support in next version + bool _save_as_one_file; // plan to support filtered index in next version. + uint64_t _save_as_one_file_version; // Version used for save index to single file. + bool _load_from_one_file; // Whether to load index from single file. + uint64_t _load_from_one_file_version; // Version used for save index to single file. bool _dynamic_index = false; bool _enable_tags = false; bool _normalize_vecs = false; // Using normalied L2 for cosine. diff --git a/include/index_config.h b/include/index_config.h index 452498b01..661949e2d 100644 --- a/include/index_config.h +++ b/include/index_config.h @@ -28,6 +28,10 @@ struct IndexConfig bool concurrent_consolidate; bool use_opq; bool filtered_index; + bool save_as_one_file; + uint64_t save_as_one_file_version; + bool load_from_one_file; + uint64_t load_from_one_file_version; size_t num_pq_chunks; size_t num_frozen_pts; @@ -45,12 +49,15 @@ struct IndexConfig IndexConfig(DataStoreStrategy data_strategy, GraphStoreStrategy graph_strategy, Metric metric, size_t dimension, size_t max_points, size_t num_pq_chunks, size_t num_frozen_points, bool dynamic_index, bool enable_tags, bool pq_dist_build, bool concurrent_consolidate, bool use_opq, bool filtered_index, - std::string &data_type, const std::string &tag_type, const std::string &label_type, - std::shared_ptr index_write_params, + bool save_as_one_file, uint64_t save_as_one_file_version, bool load_from_one_file, + uint64_t load_from_one_file_version, std::string &data_type, const std::string &tag_type, + const std::string &label_type, std::shared_ptr index_write_params, std::shared_ptr index_search_params) : data_strategy(data_strategy), graph_strategy(graph_strategy), metric(metric), dimension(dimension), max_points(max_points), dynamic_index(dynamic_index), enable_tags(enable_tags), pq_dist_build(pq_dist_build), concurrent_consolidate(concurrent_consolidate), use_opq(use_opq), filtered_index(filtered_index), + save_as_one_file(save_as_one_file), save_as_one_file_version(save_as_one_file_version), + load_from_one_file(load_from_one_file), load_from_one_file_version(load_from_one_file_version), num_pq_chunks(num_pq_chunks), num_frozen_pts(num_frozen_points), label_type(label_type), tag_type(tag_type), data_type(data_type), index_write_params(index_write_params), index_search_params(index_search_params) { @@ -194,6 +201,30 @@ class IndexConfigBuilder return *this; } + IndexConfigBuilder &with_save_as_single_file(bool save_as_one_file) + { + this->_save_as_one_file = save_as_one_file; + return *this; + } + + IndexConfigBuilder &with_save_as_single_file_version(uint64_t save_as_one_file_version) + { + this->_save_as_one_file_version = save_as_one_file_version; + return *this; + } + + IndexConfigBuilder &with_load_from_single_file(bool load_from_one_file) + { + this->_load_from_one_file = load_from_one_file; + return *this; + } + + IndexConfigBuilder &with_load_from_single_file_version(uint64_t load_from_one_file_version) + { + this->_load_from_one_file_version = load_from_one_file_version; + return *this; + } + IndexConfig build() { if (_data_type == "" || _data_type.empty()) @@ -219,7 +250,8 @@ class IndexConfigBuilder return IndexConfig(_data_strategy, _graph_strategy, _metric, _dimension, _max_points, _num_pq_chunks, _num_frozen_pts, _dynamic_index, _enable_tags, _pq_dist_build, _concurrent_consolidate, - _use_opq, _filtered_index, _data_type, _tag_type, _label_type, _index_write_params, + _use_opq, _filtered_index, _save_as_one_file, _save_as_one_file_version, _load_from_one_file, + _load_from_one_file_version, _data_type, _tag_type, _label_type, _index_write_params, _index_search_params); } @@ -240,6 +272,10 @@ class IndexConfigBuilder bool _concurrent_consolidate = false; bool _use_opq = false; bool _filtered_index{defaults::HAS_LABELS}; + bool _save_as_one_file; + uint64_t _save_as_one_file_version; + bool _load_from_one_file; + uint64_t _load_from_one_file_version; size_t _num_pq_chunks = 0; size_t _num_frozen_pts{defaults::NUM_FROZEN_POINTS_STATIC}; diff --git a/include/parameters.h b/include/parameters.h index 2bba9aeca..3c771a730 100644 --- a/include/parameters.h +++ b/include/parameters.h @@ -16,15 +16,7 @@ class IndexWriteParameters { public: - const uint32_t search_list_size; // L - const uint32_t max_degree; // R - const bool saturate_graph; - const uint32_t max_occlusion_size; // C - const float alpha; - const uint32_t num_threads; - const uint32_t filter_list_size; // Lf - private: IndexWriteParameters(const uint32_t search_list_size, const uint32_t max_degree, const bool saturate_graph, const uint32_t max_occlusion_size, const float alpha, const uint32_t num_threads, const uint32_t filter_list_size) @@ -34,6 +26,14 @@ class IndexWriteParameters { } + const uint32_t search_list_size; // L + const uint32_t max_degree; // R + const bool saturate_graph; + const uint32_t max_occlusion_size; // C + const float alpha; + const uint32_t num_threads; + const uint32_t filter_list_size; // Lf + friend class IndexWriteParametersBuilder; }; diff --git a/include/utils.h b/include/utils.h index bb03d13f1..9011634f5 100644 --- a/include/utils.h +++ b/include/utils.h @@ -714,13 +714,8 @@ inline void open_file_to_write(std::ofstream &writer, const std::string &filenam } } -template -inline size_t save_bin(const std::string &filename, T *data, size_t npts, size_t ndims, size_t offset = 0) +template inline size_t save_bin(std::ofstream &writer, T *data, size_t npts, size_t ndims, size_t offset) { - std::ofstream writer; - open_file_to_write(writer, filename); - - diskann::cout << "Writing bin: " << filename.c_str() << std::endl; writer.seekp(offset, writer.beg); int npts_i32 = (int)npts, ndims_i32 = (int)ndims; size_t bytes_written = npts * ndims * sizeof(T) + 2 * sizeof(uint32_t); @@ -730,11 +725,22 @@ inline size_t save_bin(const std::string &filename, T *data, size_t npts, size_t << std::endl; writer.write((char *)data, npts * ndims * sizeof(T)); - writer.close(); diskann::cout << "Finished writing bin." << std::endl; return bytes_written; } +template +inline size_t save_bin(const std::string &filename, T *data, size_t npts, size_t ndims, size_t offset = 0) +{ + std::ofstream writer; + open_file_to_write(writer, filename); + diskann::cout << "Writing bin file: " << filename.c_str() << std::endl; + size_t bytes_written = save_bin(writer, data, npts, ndims, offset); + writer.close(); + diskann::cout << "Close file " << filename << "." << std::endl; + return bytes_written; +} + inline void print_progress(double percentage) { int val = (int)(percentage * 100); @@ -938,12 +944,11 @@ template void save_Tvecs(const char *filename, T *data, size_t npts writer.write((char *)cur_pt, ndims * sizeof(T)); } } + template -inline size_t save_data_in_base_dimensions(const std::string &filename, T *data, size_t npts, size_t ndims, - size_t aligned_dim, size_t offset = 0) +inline size_t save_data_in_base_dimensions(std::ofstream &writer, T *data, size_t npts, size_t ndims, + size_t aligned_dim, size_t offset) { - std::ofstream writer; //(filename, std::ios::binary | std::ios::out); - open_file_to_write(writer, filename); int npts_i32 = (int)npts, ndims_i32 = (int)ndims; size_t bytes_written = 2 * sizeof(uint32_t) + npts * ndims * sizeof(T); writer.seekp(offset, writer.beg); @@ -953,10 +958,21 @@ inline size_t save_data_in_base_dimensions(const std::string &filename, T *data, { writer.write((char *)(data + i * aligned_dim), ndims * sizeof(T)); } - writer.close(); return bytes_written; } +template +inline size_t save_data_in_base_dimensions(const std::string &filename, T *data, size_t npts, size_t ndims, + size_t aligned_dim, size_t offset = 0) +{ + std::ofstream writer; //(filename, std::ios::binary | std::ios::out); + open_file_to_write(writer, filename); + size_t file_size = save_data_in_base_dimensions(writer, data, npts, ndims, aligned_dim, offset); + writer.close(); + + return file_size; +} + template inline void copy_aligned_data_from_file(const char *bin_file, T *&data, size_t &npts, size_t &dim, const size_t &rounded_dim, size_t offset = 0) @@ -968,11 +984,12 @@ inline void copy_aligned_data_from_file(const char *bin_file, T *&data, size_t & throw diskann::ANNException("Null pointer passed to copy_aligned_data_from_file function", -1, __FUNCSIG__, __FILE__, __LINE__); } + std::ifstream reader; reader.exceptions(std::ios::badbit | std::ios::failbit); reader.open(bin_file, std::ios::binary); - reader.seekg(offset, reader.beg); + reader.seekg(offset, reader.beg); int npts_i32, dim_i32; reader.read((char *)&npts_i32, sizeof(int)); reader.read((char *)&dim_i32, sizeof(int)); diff --git a/src/distance.cpp b/src/distance.cpp index 31ab9d3ff..f1c1a317a 100644 --- a/src/distance.cpp +++ b/src/distance.cpp @@ -730,4 +730,7 @@ template DISKANN_DLLEXPORT class SlowDistanceL2; template DISKANN_DLLEXPORT class SlowDistanceL2; template DISKANN_DLLEXPORT class SlowDistanceL2; +template DISKANN_DLLEXPORT Distance *get_distance_function(Metric m); +template DISKANN_DLLEXPORT Distance *get_distance_function(Metric m); +template DISKANN_DLLEXPORT Distance *get_distance_function(Metric m); } // namespace diskann diff --git a/src/in_mem_data_store.cpp b/src/in_mem_data_store.cpp index 7d02bba17..e168d96fa 100644 --- a/src/in_mem_data_store.cpp +++ b/src/in_mem_data_store.cpp @@ -37,17 +37,21 @@ template size_t InMemDataStore::get_alignment_factor() return _distance_fn->get_required_alignment(); } -template location_t InMemDataStore::load(const std::string &filename) +template location_t InMemDataStore::load(const std::string &filename, size_t offset) { - return load_impl(filename); + return load_impl(filename, offset); +} + +template location_t InMemDataStore::load(AlignedFileReader &reader, size_t offset) +{ + return load_impl(reader, offset); } #ifdef EXEC_ENV_OLS -template location_t InMemDataStore::load_impl(AlignedFileReader &reader) +template location_t InMemDataStore::load_impl(AlignedFileReader &reader, size_t offset) { size_t file_dim, file_num_points; - - diskann::get_bin_metadata(reader, file_num_points, file_dim); + diskann::get_bin_metadata(reader, file_num_points, file_dim, offset); if (file_dim != this->_dim) { @@ -63,13 +67,14 @@ template location_t InMemDataStore::load_impl(AlignedF { this->resize((location_t)file_num_points); } - copy_aligned_data_from_file(reader, _data, file_num_points, file_dim, _aligned_dim); + + copy_aligned_data_from_file(reader, _data, file_num_points, file_dim, _aligned_dim, offset); return (location_t)file_num_points; } #endif -template location_t InMemDataStore::load_impl(const std::string &filename) +template location_t InMemDataStore::load_impl(const std::string &filename, size_t offset) { size_t file_dim, file_num_points; if (!file_exists(filename)) @@ -80,7 +85,7 @@ template location_t InMemDataStore::load_impl(const st aligned_free(_data); throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } - diskann::get_bin_metadata(filename, file_num_points, file_dim); + diskann::get_bin_metadata(filename, file_num_points, file_dim, offset); if (file_dim != this->_dim) { @@ -97,14 +102,20 @@ template location_t InMemDataStore::load_impl(const st this->resize((location_t)file_num_points); } - copy_aligned_data_from_file(filename.c_str(), _data, file_num_points, file_dim, _aligned_dim); + copy_aligned_data_from_file(filename.c_str(), _data, file_num_points, file_dim, _aligned_dim, offset); return (location_t)file_num_points; } -template size_t InMemDataStore::save(const std::string &filename, const location_t num_points) +template size_t InMemDataStore::save(const std::string &filename, const location_t num_pts) +{ + return save_data_in_base_dimensions(filename, _data, num_pts, this->get_dims(), this->get_aligned_dim(), 0U); +} + +template +size_t InMemDataStore::save(std::ofstream &writer, const location_t num_pts, size_t offset) { - return save_data_in_base_dimensions(filename, _data, num_points, this->get_dims(), this->get_aligned_dim(), 0U); + return save_data_in_base_dimensions(writer, _data, num_pts, this->get_dims(), this->get_aligned_dim(), offset); } template void InMemDataStore::populate_data(const data_t *vectors, const location_t num_pts) diff --git a/src/in_mem_graph_store.cpp b/src/in_mem_graph_store.cpp index c12b2514e..8d8e83cc4 100644 --- a/src/in_mem_graph_store.cpp +++ b/src/in_mem_graph_store.cpp @@ -4,6 +4,7 @@ #include "in_mem_graph_store.h" #include "utils.h" + namespace diskann { InMemGraphStore::InMemGraphStore(const size_t total_pts, const size_t reserve_graph_degree) @@ -16,16 +17,38 @@ InMemGraphStore::InMemGraphStore(const size_t total_pts, const size_t reserve_gr } } +#ifdef EXEC_ENV_OLS +std::tuple InMemGraphStore::load(AlignedFileReader &reader, + const size_t num_points, size_t offset) +{ + + return load_impl(reader, num_points, offset); +} +#else std::tuple InMemGraphStore::load(const std::string &index_path_prefix, - const size_t num_points) + const size_t num_points, size_t offset) { - return load_impl(index_path_prefix, num_points); + + return load_impl(index_path_prefix, num_points, offset); } +#endif int InMemGraphStore::store(const std::string &index_path_prefix, const size_t num_points, const size_t num_frozen_points, const uint32_t start) { - return save_graph(index_path_prefix, num_points, num_frozen_points, start); + std::ofstream writer; + open_file_to_write(writer, index_path_prefix); + int file_size = store(writer, num_points, num_frozen_points, start, 0U); + writer.close(); + + return file_size; +} + +int InMemGraphStore::store(std::ofstream &writer, const size_t num_points, const size_t num_frozen_points, + const uint32_t start, size_t offset) +{ + return save_graph(writer, num_points, num_frozen_points, start, offset); } + const std::vector &InMemGraphStore::get_neighbours(const location_t i) const { return _graph.at(i); @@ -71,16 +94,17 @@ void InMemGraphStore::clear_graph() } #ifdef EXEC_ENV_OLS -std::tuple InMemGraphStore::load_impl(AlignedFileReader &reader, size_t expected_num_points) +std::tuple InMemGraphStore::load_impl(AlignedFileReader &reader, + size_t expected_num_points, + size_t offset) { size_t expected_file_size; size_t file_frozen_pts; uint32_t start; - auto max_points = get_max_points(); int header_size = 2 * sizeof(size_t) + 2 * sizeof(uint32_t); std::unique_ptr header = std::make_unique(header_size); - read_array(reader, header.get(), header_size); + read_array(reader, header.get(), header_size, offset); expected_file_size = *((size_t *)header.get()); _max_observed_degree = *((uint32_t *)(header.get() + sizeof(size_t))); @@ -91,7 +115,7 @@ std::tuple InMemGraphStore::load_impl(AlignedFileRea << ", _max_observed_degree: " << _max_observed_degree << ", _start: " << start << ", file_frozen_pts: " << file_frozen_pts << std::endl; - diskann::cout << "Loading vamana graph from reader..." << std::flush; + diskann::cout << "Loading vamana graph from reader..." << std::endl << std::flush; // If user provides more points than max_points // resize the _graph to the larger size. @@ -103,7 +127,7 @@ std::tuple InMemGraphStore::load_impl(AlignedFileRea uint32_t nodes_read = 0; size_t cc = 0; - size_t graph_offset = header_size; + size_t graph_offset = header_size + offset; while (nodes_read < expected_num_points) { uint32_t k; @@ -111,9 +135,12 @@ std::tuple InMemGraphStore::load_impl(AlignedFileRea graph_offset += sizeof(uint32_t); std::vector tmp(k); tmp.reserve(k); - read_array(reader, tmp.data(), k, graph_offset); - graph_offset += k * sizeof(uint32_t); - cc += k; + if (k > 0) + { + read_array(reader, tmp.data(), k, graph_offset); + graph_offset += k * sizeof(uint32_t); + cc += k; + } _graph[nodes_read].swap(tmp); nodes_read++; if (nodes_read % 1000000 == 0) @@ -130,20 +157,19 @@ std::tuple InMemGraphStore::load_impl(AlignedFileRea << std::endl; return std::make_tuple(nodes_read, start, file_frozen_pts); } -#endif +#else std::tuple InMemGraphStore::load_impl(const std::string &filename, - size_t expected_num_points) + size_t expected_num_points, size_t offset) { size_t expected_file_size; size_t file_frozen_pts; uint32_t start; - size_t file_offset = 0; // will need this for single file format support std::ifstream in; in.exceptions(std::ios::badbit | std::ios::failbit); in.open(filename, std::ios::binary); - in.seekg(file_offset, in.beg); + in.seekg(offset, in.beg); in.read((char *)&expected_file_size, sizeof(size_t)); in.read((char *)&_max_observed_degree, sizeof(uint32_t)); in.read((char *)&start, sizeof(uint32_t)); @@ -196,36 +222,34 @@ std::tuple InMemGraphStore::load_impl(const std::str << std::endl; return std::make_tuple(nodes_read, start, file_frozen_pts); } +#endif -int InMemGraphStore::save_graph(const std::string &index_path_prefix, const size_t num_points, - const size_t num_frozen_points, const uint32_t start) +int InMemGraphStore::save_graph(std::ofstream &writer, const size_t num_points, const size_t num_frozen_points, + const uint32_t start, size_t offset) { - std::ofstream out; - open_file_to_write(out, index_path_prefix); - - size_t file_offset = 0; - out.seekp(file_offset, out.beg); + writer.seekp(offset, writer.beg); size_t index_size = 24; uint32_t max_degree = 0; - out.write((char *)&index_size, sizeof(uint64_t)); - out.write((char *)&_max_observed_degree, sizeof(uint32_t)); + writer.write((char *)&index_size, sizeof(uint64_t)); + writer.write((char *)&_max_observed_degree, sizeof(uint32_t)); uint32_t ep_u32 = start; - out.write((char *)&ep_u32, sizeof(uint32_t)); - out.write((char *)&num_frozen_points, sizeof(size_t)); + writer.write((char *)&ep_u32, sizeof(uint32_t)); + writer.write((char *)&num_frozen_points, sizeof(size_t)); // Note: num_points = _nd + _num_frozen_points for (uint32_t i = 0; i < num_points; i++) { uint32_t GK = (uint32_t)_graph[i].size(); - out.write((char *)&GK, sizeof(uint32_t)); - out.write((char *)_graph[i].data(), GK * sizeof(uint32_t)); + writer.write((char *)&GK, sizeof(uint32_t)); + writer.write((char *)_graph[i].data(), GK * sizeof(uint32_t)); max_degree = _graph[i].size() > max_degree ? (uint32_t)_graph[i].size() : max_degree; index_size += (size_t)(sizeof(uint32_t) * (GK + 1)); } - out.seekp(file_offset, out.beg); - out.write((char *)&index_size, sizeof(uint64_t)); - out.write((char *)&max_degree, sizeof(uint32_t)); - out.close(); + + writer.seekp(offset, writer.beg); + writer.write((char *)&index_size, sizeof(uint64_t)); + writer.write((char *)&max_degree, sizeof(uint32_t)); + return (int)index_size; } diff --git a/src/index.cpp b/src/index.cpp index 3de3a3b7f..2fca21ad3 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -25,6 +25,11 @@ namespace diskann { +SaveLoadMetaDataV1::SaveLoadMetaDataV1() : data_offset(0), delete_list_offset(0), tags_offset(0), graph_offset(0) +{ +} + + // Initialize an index with metric m, load the data of type T with filename // (bin), and initialize max_points template @@ -34,7 +39,10 @@ Index::Index(const IndexConfig &index_config, std::unique_ptr), _conc_consolidate(index_config.concurrent_consolidate) { if (_dynamic_index && !_enable_tags) @@ -125,7 +133,8 @@ Index::Index(Metric m, const size_t dim, const size_t max_point const std::shared_ptr index_search_params, const size_t num_frozen_pts, const bool dynamic_index, const bool enable_tags, const bool concurrent_consolidate, const bool pq_dist_build, const size_t num_pq_chunks, const bool use_opq, - const bool filtered_index) + const bool filtered_index, bool save_as_one_file, uint64_t save_as_one_file_version, + bool load_from_one_file, uint64_t load_from_one_file_version) : Index(IndexConfigBuilder() .with_metric(m) .with_dimension(dim) @@ -141,13 +150,17 @@ Index::Index(Metric m, const size_t dim, const size_t max_point .is_use_opq(use_opq) .is_filtered(filtered_index) .with_data_type(diskann_type_to_name()) + .with_save_as_single_file(save_as_one_file) + .with_save_as_single_file_version(save_as_one_file_version) + .with_load_from_single_file(load_from_one_file) + .with_load_from_single_file_version(load_from_one_file_version) .build(), IndexFactory::construct_datastore( DataStoreStrategy::MEMORY, - max_points + (dynamic_index && num_frozen_pts == 0 ? (size_t)1 : num_frozen_pts), dim, m), + (max_points == 0? (size_t)1 : max_points) + (dynamic_index && num_frozen_pts == 0 ? (size_t)1 : num_frozen_pts), dim, m), IndexFactory::construct_graphstore( GraphStoreStrategy::MEMORY, - max_points + (dynamic_index && num_frozen_pts == 0 ? (size_t)1 : num_frozen_pts), + (max_points == 0? (size_t)1 : max_points) + (dynamic_index && num_frozen_pts == 0 ? (size_t)1 : num_frozen_pts), (size_t)((index_parameters == nullptr ? 0 : index_parameters->max_degree) * defaults::GRAPH_SLACK_FACTOR * 1.05))) { @@ -273,7 +286,7 @@ void Index::save(const char *filename, bool compact_before_save if (compact_before_save) { - compact_data(); + compact_data(true); compact_frozen_point(); } else @@ -379,9 +392,107 @@ void Index::save(const char *filename, bool compact_before_save } else { - diskann::cout << "Save index in a single file currently not supported. " - "Not saving the index." - << std::endl; + if (_filtered_index) + { + diskann::cout << "Save index in a single file currently not supported for filtered index. " + "Not saving the index." + << std::endl; + } + else + { + if (_save_as_one_file_version == 1) + { + std::ofstream writer; + open_file_to_write(writer, filename); + + // Save version. + writer.write((char *)&_save_as_one_file_version, sizeof(uint64_t)); + size_t curr_pos = sizeof(uint64_t); + + // Placeholder for metadata. + // This will be filled at end; + SaveLoadMetaDataV1 metadata; + const size_t meta_data_start = curr_pos; + curr_pos += sizeof(SaveLoadMetaDataV1); + + // Save data. + { + metadata.data_offset = static_cast(curr_pos); + curr_pos += _data_store->save(writer, (location_t)(_nd + _num_frozen_pts), curr_pos); + } + + // Save delete list. + { + metadata.delete_list_offset = static_cast(curr_pos); + + if (_delete_set->size() != 0) + { + std::unique_ptr delete_list = std::make_unique(_delete_set->size()); + uint32_t i = 0; + for (auto &del : *_delete_set) + { + delete_list[i++] = del; + } + curr_pos += save_bin(writer, delete_list.get(), _delete_set->size(), 1, curr_pos); + } + } + + // Save tags. + { + metadata.tags_offset = static_cast(curr_pos); + + if (_enable_tags) + { + TagT *tag_data = new TagT[_nd + _num_frozen_pts]; + for (uint32_t i = 0; i < _nd; i++) + { + TagT tag; + if (_location_to_tag.try_get(i, tag)) + { + tag_data[i] = tag; + } + else + { + // catering to future when tagT can be any type. + std::memset((char *)&tag_data[i], 0, sizeof(TagT)); + } + } + if (_num_frozen_pts > 0) + { + std::memset((char *)&tag_data[_start], 0, sizeof(TagT) * _num_frozen_pts); + } + + curr_pos += save_bin(writer, tag_data, _nd + _num_frozen_pts, 1, curr_pos); + delete[] tag_data; + } + } + + // Save graph. + { + metadata.graph_offset = static_cast(curr_pos); + _graph_store->store(writer, _nd + _num_frozen_pts, _num_frozen_pts, _start, curr_pos); + } + + // Save metadata. + { + writer.seekp(meta_data_start, writer.beg); + writer.write((char *)&metadata, sizeof(SaveLoadMetaDataV1)); + } + + writer.close(); + + diskann::cout << "Metadata Saved. data_offset: " << std::to_string(metadata.data_offset) + << " delete_list_offset: " << std::to_string(metadata.delete_list_offset) + << " tag_offset: " << std::to_string(metadata.tags_offset) + << " graph_offset: " << std::to_string(metadata.graph_offset) << std::endl; + } + else + { + diskann::cout << "Save index in a single file currently only support _save_as_one_file_version = 1. " + "Not saving the index." + << std::endl; + } + } } // If frozen points were temporarily compacted to _nd, move back to @@ -393,17 +504,16 @@ void Index::save(const char *filename, bool compact_before_save #ifdef EXEC_ENV_OLS template -size_t Index::load_tags(AlignedFileReader &reader) +size_t Index::load_tags(AlignedFileReader &reader, size_t offset) { #else template -size_t Index::load_tags(const std::string tag_filename) +size_t Index::load_tags(const std::string &filename, size_t offset) { - if (_enable_tags && !file_exists(tag_filename)) + if (_enable_tags && !file_exists(filename)) { - diskann::cerr << "Tag file " << tag_filename << " does not exist!" << std::endl; - throw diskann::ANNException("Tag file " + tag_filename + " does not exist!", -1, __FUNCSIG__, __FILE__, - __LINE__); + diskann::cerr << "Tag file " << filename << " does not exist!" << std::endl; + throw diskann::ANNException("Tag file " + filename + " does not exist!", -1, __FUNCSIG__, __FILE__, __LINE__); } #endif if (!_enable_tags) @@ -415,9 +525,9 @@ size_t Index::load_tags(const std::string tag_filename) size_t file_dim, file_num_points; TagT *tag_data; #ifdef EXEC_ENV_OLS - load_bin(reader, tag_data, file_num_points, file_dim); + load_bin(reader, tag_data, file_num_points, file_dim, offset); #else - load_bin(std::string(tag_filename), tag_data, file_num_points, file_dim); + load_bin(std::string(filename), tag_data, file_num_points, file_dim, offset); #endif if (file_dim != 1) @@ -449,15 +559,15 @@ size_t Index::load_tags(const std::string tag_filename) template #ifdef EXEC_ENV_OLS -size_t Index::load_data(AlignedFileReader &reader) +size_t Index::load_data(AlignedFileReader &reader, size_t offset) { #else -size_t Index::load_data(std::string filename) +size_t Index::load_data(std::string filename, size_t offset) { #endif size_t file_dim, file_num_points; #ifdef EXEC_ENV_OLS - diskann::get_bin_metadata(reader, file_num_points, file_dim); + diskann::get_bin_metadata(reader, file_num_points, file_dim, offset); #else if (!file_exists(filename)) { @@ -466,7 +576,7 @@ size_t Index::load_data(std::string filename) diskann::cerr << stream.str() << std::endl; throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } - diskann::get_bin_metadata(filename, file_num_points, file_dim); + diskann::get_bin_metadata(filename, file_num_points, file_dim, offset); #endif // since we are loading a new dataset, _empty_slots must be cleared @@ -480,7 +590,6 @@ size_t Index::load_data(std::string filename) diskann::cerr << stream.str() << std::endl; throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } - if (file_num_points > _max_points + _num_frozen_pts) { // update and tag lock acquired in load() before calling load_data @@ -490,29 +599,29 @@ size_t Index::load_data(std::string filename) #ifdef EXEC_ENV_OLS // REFACTOR TODO: Must figure out how to support aligned reader in a clean // manner. - copy_aligned_data_from_file(reader, _data, file_num_points, file_dim, _data_store->get_aligned_dim()); + _data_store->load(reader, offset); // offset == 0. #else - _data_store->load(filename); // offset == 0. + _data_store->load(filename, offset); // offset == 0. #endif return file_num_points; } #ifdef EXEC_ENV_OLS template -size_t Index::load_delete_set(AlignedFileReader &reader) +size_t Index::load_delete_set(AlignedFileReader &reader, size_t offset) { #else template -size_t Index::load_delete_set(const std::string &filename) +size_t Index::load_delete_set(const std::string &filename, size_t offset) { #endif std::unique_ptr delete_list; size_t npts, ndim; #ifdef EXEC_ENV_OLS - diskann::load_bin(reader, delete_list, npts, ndim); + diskann::load_bin(reader, delete_list, npts, ndim, offset); #else - diskann::load_bin(filename, delete_list, npts, ndim); + diskann::load_bin(filename, delete_list, npts, ndim, offset); #endif assert(ndim == 1); for (uint32_t i = 0; i < npts; i++) @@ -528,6 +637,7 @@ template #ifdef EXEC_ENV_OLS void Index::load(AlignedFileReader &reader, uint32_t num_threads, uint32_t search_l) { + IOContext &ctx = reader.get_ctx(); #else void Index::load(const char *filename, uint32_t num_threads, uint32_t search_l) { @@ -546,7 +656,7 @@ void Index::load(const char *filename, uint32_t num_threads, ui std::string labels_to_medoids = mem_index_file + "_labels_to_medoids.txt"; std::string labels_map_file = mem_index_file + "_labels_map.txt"; #endif - if (!_save_as_one_file) + if (!_load_from_one_file) { // For DLVS Store, we will not support saving the index in multiple // files. @@ -569,10 +679,116 @@ void Index::load(const char *filename, uint32_t num_threads, ui } else { - diskann::cout << "Single index file saving/loading support not yet " - "enabled. Not loading the index." - << std::endl; - return; + if (_filtered_index) + { + diskann::cout << "Single index file saving/loading support for filtered index is not yet " + "enabled. Not loading the index." + << std::endl; + } + else + { + uint64_t version = 0; + +#ifdef EXEC_ENV_OLS + std::vector readReqs; + AlignedRead readReq; + uint8_t buf[sizeof(uint64_t)] = {}; + + readReq.buf = (void *) buf; + readReq.offset = 0; + readReq.len = sizeof(uint64_t); + readReqs.push_back(readReq); + reader.read(readReqs, ctx); // synchronous + + if ((*(ctx.m_pRequestsStatus.get()))[0] == IOContext::READ_SUCCESS) + { + memcpy((void *)&version, (void *)buf, sizeof(uint64_t)); + } + else + { + std::stringstream str; + str << "Could not read binary metadata from index file at offset: 0." << std::endl; + throw diskann::ANNException(str.str(), -1, __FUNCSIG__, __FILE__, __LINE__); + } + +#else + std::ifstream reader(filename, std::ios::binary); + reader.read((char *)&version, sizeof(uint64_t)); +#endif + + if (version == _load_from_one_file_version) + { + SaveLoadMetaDataV1 metadata; +#ifdef EXEC_ENV_OLS + std::vector metadata_readReqs; + AlignedRead metadata_readReq; + uint8_t metadata_buf[sizeof(SaveLoadMetaDataV1)] = {}; + + metadata_readReq.buf = (void*) metadata_buf; + metadata_readReq.offset = sizeof(version); + metadata_readReq.len = sizeof(SaveLoadMetaDataV1); + metadata_readReqs.push_back(metadata_readReq); + reader.read(metadata_readReqs, ctx); // synchronous + if ((*(ctx.m_pRequestsStatus))[0] == IOContext::READ_SUCCESS) + { + memcpy((void *)&metadata, (void *)metadata_buf, sizeof(SaveLoadMetaDataV1)); + } + + diskann::cout << "Metadata loaded. data_offset: " << std::to_string(metadata.data_offset) + << " delete_list_offset: " << std::to_string(metadata.delete_list_offset) + << " tag_offset: " << std::to_string(metadata.tags_offset) + << " graph_offset: " << std::to_string(metadata.graph_offset) + << std::endl; + +#else + reader.read((char *)&metadata, sizeof(SaveLoadMetaDataV1)); +#endif + // Load data +#ifdef EXEC_ENV_OLS + data_file_num_pts = load_data(reader, metadata.data_offset); + +#else + data_file_num_pts = load_data(filename, metadata.data_offset); +#endif + + // Load delete list when presents. + if (metadata.delete_list_offset != metadata.tags_offset) + { +#ifdef EXEC_ENV_OLS + load_delete_set(reader, metadata.delete_list_offset); +#else + load_delete_set(filename, metadata.delete_list_offset); +#endif + } + + // Load tags when presents. + if (metadata.tags_offset != metadata.graph_offset) + { +#ifdef EXEC_ENV_OLS + tags_file_num_pts = load_tags(reader, metadata.tags_offset); +#else + tags_file_num_pts = load_tags(filename, metadata.tags_offset); +#endif + } + // Load graph +#ifdef EXEC_ENV_OLS + + graph_num_pts = load_graph(reader, data_file_num_pts, metadata.graph_offset); +#else + graph_num_pts = load_graph(filename, data_file_num_pts, metadata.graph_offset); +#endif + } + else + { + std::stringstream stream; + stream << "load index from a single file currently only support _save_as_one_file_version = 1 and _load_as_one_file_version = 1. " + << "Not loading the index." + << std::endl; + diskann::cerr << stream.str() << std::endl; + + throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); + } + } } if (data_file_num_pts != graph_num_pts || (data_file_num_pts != tags_file_num_pts && _enable_tags)) @@ -584,6 +800,8 @@ void Index::load(const char *filename, uint32_t num_threads, ui diskann::cerr << stream.str() << std::endl; throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } + + #ifndef EXEC_ENV_OLS if (file_exists(labels_file)) { @@ -631,6 +849,7 @@ void Index::load(const char *filename, uint32_t num_threads, ui } } #endif + _nd = data_file_num_pts - _num_frozen_pts; _empty_slots.clear(); _empty_slots.reserve(_max_points); @@ -679,15 +898,17 @@ size_t Index::get_graph_num_frozen_points(const std::string &gr #ifdef EXEC_ENV_OLS template -size_t Index::load_graph(AlignedFileReader &reader, size_t expected_num_points) +size_t Index::load_graph(AlignedFileReader &reader, size_t expected_num_points, size_t offset) { + auto res = _graph_store->load(reader, expected_num_points, offset); + #else template -size_t Index::load_graph(std::string filename, size_t expected_num_points) +size_t Index::load_graph(std::string filename, size_t expected_num_points, size_t offset) { + auto res = _graph_store->load(filename, expected_num_points, offset); #endif - auto res = _graph_store->load(filename, expected_num_points); _start = std::get<1>(res); _num_frozen_pts = std::get<2>(res); return std::get<0>(res); @@ -1793,7 +2014,7 @@ void Index::build(const std::string &data_file, const size_t nu this->build_filtered_index(data_file.c_str(), labels_file_to_use, points_to_load); } std::chrono::duration diff = std::chrono::high_resolution_clock::now() - s; - std::cout << "Indexing time: " << diff.count() << "\n"; + diskann::cout << "Indexing time: " << diff.count() << "\n"; } template @@ -2275,6 +2496,12 @@ template size_t Index size_t Index::get_num_deleted_points() +{ + std::shared_lock dl(_delete_lock); + return _delete_set->size(); +} + template void Index::generate_frozen_point() { if (_num_frozen_pts == 0) @@ -2528,7 +2755,7 @@ template void Index void Index::compact_data() +template void Index::compact_data(bool forced) { if (!_dynamic_index) throw ANNException("Can not compact a non-dynamic index", -1, __FUNCSIG__, __FILE__, __LINE__); @@ -2536,7 +2763,11 @@ template void Indexsize() > 0) @@ -2898,15 +3129,7 @@ int Index::insert_point(const T *point, const TagT tag) template int Index::insert_point(const T *point, const TagT tag, const std::vector &labels) { - assert(_has_built); - if (tag == static_cast(0)) - { - throw diskann::ANNException("Do not insert point with tag 0. That is " - "reserved for points hidden " - "from the user.", - -1, __FUNCSIG__, __FILE__, __LINE__); - } std::shared_lock shared_ul(_update_lock); std::unique_lock tl(_tag_lock); diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index c9b2c0ebb..33867d4be 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -1123,7 +1123,7 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons { uint64_t dumr, dumc; float *norm_val; - diskann::load_bin(files, norm_val, dumr, dumc); + diskann::load_bin(files, norm_file, norm_val, dumr, dumc); #else if (file_exists(norm_file) && metric == diskann::Metric::INNER_PRODUCT) { diff --git a/src/utils.cpp b/src/utils.cpp index b675e656d..ab36c42a8 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -285,6 +285,7 @@ template void load_bin(AlignedFileReader &reader, T *&data, size_t { // Code assumes that the reader is already setup correctly. get_bin_metadata(reader, npts, ndim, offset); + data = new T[npts * ndim]; size_t data_size = npts * ndim * sizeof(T); @@ -333,7 +334,7 @@ void copy_aligned_data_from_file(AlignedFileReader &reader, T *&data, size_t &np { if (data == nullptr) { - diskann::cerr << "Memory was not allocated for " << data << " before calling the load function. Exiting..." + diskann::cout << "Memory was not allocated for " << data << " before calling the load function. Exiting..." << std::endl; throw diskann::ANNException("Null pointer passed to copy_aligned_data_from_file()", -1, __FUNCSIG__, __FILE__, __LINE__); @@ -391,29 +392,31 @@ template void read_array(AlignedFileReader &reader, T *data, size_t if (data == nullptr) { throw diskann::ANNException("read_array requires an allocated buffer.", -1); - if (size * sizeof(T) > MAX_REQUEST_SIZE) - { - std::stringstream ss; - ss << "Cannot read more than " << MAX_REQUEST_SIZE - << " bytes. Current request size: " << std::to_string(size) << " sizeof(T): " << sizeof(T) << std::endl; - throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__); - } - std::vector read_requests; - AlignedRead read_req; - read_req.buf = data; - read_req.len = size * sizeof(T); - read_req.offset = offset; - read_requests.push_back(read_req); - IOContext &ctx = reader.get_ctx(); - reader.read(read_requests, ctx); + } - if ((*(ctx.m_pRequestsStatus))[0] != IOContext::READ_SUCCESS) - { - std::stringstream ss; - ss << "Failed to read_array() of size: " << size * sizeof(T) << " at offset: " << offset << " from reader. " - << std::endl; - throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__); - } + if (size * sizeof(T) > MAX_REQUEST_SIZE) + { + std::stringstream ss; + ss << "Cannot read more than " << MAX_REQUEST_SIZE + << " bytes. Current request size: " << std::to_string(size) << " sizeof(T): " << sizeof(T) << std::endl; + throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__); + } + + std::vector read_requests; + AlignedRead read_req; + read_req.buf = data; + read_req.len = size * sizeof(T); + read_req.offset = offset; + read_requests.push_back(read_req); + IOContext &ctx = reader.get_ctx(); + reader.read(read_requests, ctx); + + if ((*(ctx.m_pRequestsStatus))[0] != IOContext::READ_SUCCESS) + { + std::stringstream ss; + ss << "Failed to read_array() of size: " << size * sizeof(T) << " at offset: " << offset << " from reader. " + << std::endl; + throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } }