From 74a30fe1024691905c2c48be3c7571b9c691e5ff Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 08:47:15 +1000 Subject: [PATCH 1/4] Make ObjectFetcher generic rather than dyn trait, use anyhow --- Cargo.lock | 1 + crates/subspace-gateway-rpc/src/lib.rs | 28 ++++++++++++++----- crates/subspace-gateway/src/commands/run.rs | 3 +- .../subspace-gateway/src/commands/run/rpc.rs | 12 +++++--- crates/subspace-gateway/src/piece_getter.rs | 4 +-- shared/subspace-data-retrieval/Cargo.toml | 1 + .../src/object_fetcher.rs | 27 ++++++++++-------- .../src/piece_fetcher.rs | 4 +-- .../src/piece_getter.rs | 12 +++----- 9 files changed, 56 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53b3c329e9..b26117bc90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12469,6 +12469,7 @@ dependencies = [ name = "subspace-data-retrieval" version = "0.1.0" dependencies = [ + "anyhow", "async-lock 3.4.0", "async-trait", "futures", diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs index af04fed933..570d2db0e3 100644 --- a/crates/subspace-gateway-rpc/src/lib.rs +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -8,6 +8,7 @@ use std::ops::{Deref, DerefMut}; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; use subspace_core_primitives::objects::GlobalObjectMapping; use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use tracing::debug; const SUBSPACE_ERROR: i32 = 9000; @@ -99,22 +100,32 @@ pub trait SubspaceGatewayRpcApi { #[method(name = "subspace_fetchObject")] async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error>; } + /// Subspace Gateway RPC configuration -pub struct SubspaceGatewayRpcConfig { +pub struct SubspaceGatewayRpcConfig +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// DSN object fetcher instance. - pub object_fetcher: ObjectFetcher, + pub object_fetcher: ObjectFetcher, } /// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway. -pub struct SubspaceGatewayRpc { +pub struct SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// DSN object fetcher instance. - object_fetcher: ObjectFetcher, + object_fetcher: ObjectFetcher, } /// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN. -impl SubspaceGatewayRpc { +impl SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// Creates a new instance of the `SubspaceGatewayRpc` handler. - pub fn new(config: SubspaceGatewayRpcConfig) -> Self { + pub fn new(config: SubspaceGatewayRpcConfig) -> Self { Self { object_fetcher: config.object_fetcher, } @@ -122,7 +133,10 @@ impl SubspaceGatewayRpc { } #[async_trait] -impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc { +impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error> { // TODO: deny unsafe RPC calls diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index ace0f5333b..89763055f6 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -15,6 +15,7 @@ use futures::{select, FutureExt}; use std::env; use std::num::NonZeroUsize; use std::pin::pin; +use std::sync::Arc; use subspace_core_primitives::pieces::Record; use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; @@ -95,7 +96,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { dsn_node.clone(), SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg), ); - let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size)); + let object_fetcher = ObjectFetcher::new(Arc::new(piece_getter), erasure_coding, Some(max_size)); let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs index a7e58bb234..99a504ce59 100644 --- a/crates/subspace-gateway/src/commands/run/rpc.rs +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -3,6 +3,7 @@ use clap::Parser; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer}; use tracing::info; @@ -27,10 +28,13 @@ pub(crate) struct RpcOptions { // - add an argument for a custom tokio runtime // - move this RPC code into a new library part of this crate // - make a RPC config that is independent of clap -pub async fn launch_rpc_server( - rpc_api: SubspaceGatewayRpc, - rpc_options: RpcOptions

, -) -> anyhow::Result { +pub async fn launch_rpc_server( + rpc_api: SubspaceGatewayRpc, + rpc_options: RpcOptions, +) -> anyhow::Result +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ let server = ServerBuilder::default() .build(rpc_options.rpc_listen_on) .await?; diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs index ddf6cf86ef..8d90667f65 100644 --- a/crates/subspace-gateway/src/piece_getter.rs +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -5,7 +5,7 @@ use futures::stream::StreamExt; use std::fmt; use std::ops::{Deref, DerefMut}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; -use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; use subspace_networking::Node; @@ -51,7 +51,7 @@ impl ObjectPieceGetter for DsnPieceGetter where PV: PieceValidator, { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if let Some((got_piece_index, maybe_piece)) = self.get_from_cache([piece_index]).await.next().await { diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 6c5ec9e6f9..55119acd13 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -12,6 +12,7 @@ include = [ ] [dependencies] +anyhow = "1.0.89" async-lock = "3.4.0" async-trait = "0.1.83" futures = "0.3.31" diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 4968c86739..d2818cf844 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -16,7 +16,7 @@ //! Fetching objects stored in the archived history of Subspace Network. use crate::piece_fetcher::download_pieces; -use crate::piece_getter::{BoxError, ObjectPieceGetter}; +use crate::piece_getter::ObjectPieceGetter; use crate::segment_fetcher::{download_segment, SegmentGetterError}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use std::sync::Arc; @@ -123,7 +123,7 @@ pub enum Error { #[error("Getting piece caused an error: {source:?}")] PieceGetterError { #[from] - source: BoxError, + source: anyhow::Error, }, /// Piece getter couldn't find the piece @@ -132,9 +132,12 @@ pub enum Error { } /// Object fetcher for the Subspace DSN. -pub struct ObjectFetcher { +pub struct ObjectFetcher +where + PG: ObjectPieceGetter + Send + Sync, +{ /// The piece getter used to fetch pieces. - piece_getter: Arc, + piece_getter: Arc, /// The erasure coding configuration of those pieces. erasure_coding: ErasureCoding, @@ -143,21 +146,21 @@ pub struct ObjectFetcher { max_object_len: usize, } -impl ObjectFetcher { +impl ObjectFetcher +where + PG: ObjectPieceGetter + Send + Sync, +{ /// Create a new object fetcher with the given configuration. /// /// `max_object_len` is the amount of data bytes we'll read for a single object before giving /// up and returning an error, or `None` for no limit (`usize::MAX`). - pub fn new( - piece_getter: PG, + pub fn new( + piece_getter: Arc, erasure_coding: ErasureCoding, max_object_len: Option, - ) -> Self - where - PG: ObjectPieceGetter + Send + Sync + 'static, - { + ) -> Self { Self { - piece_getter: Arc::new(piece_getter), + piece_getter, erasure_coding, max_object_len: max_object_len.unwrap_or(usize::MAX), } diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index 14f6e823ae..afe1680160 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -16,7 +16,7 @@ //! Fetching pieces of the archived history of Subspace Network. use crate::object_fetcher::Error; -use crate::piece_getter::{BoxError, ObjectPieceGetter}; +use crate::piece_getter::ObjectPieceGetter; use futures::stream::FuturesOrdered; use futures::TryStreamExt; use subspace_core_primitives::pieces::{Piece, PieceIndex}; @@ -31,7 +31,7 @@ use tracing::{debug, trace}; pub async fn download_pieces( piece_indexes: &[PieceIndex], piece_getter: &PG, -) -> Result, BoxError> +) -> anyhow::Result> where PG: ObjectPieceGetter, { diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs index 70f470de70..3dd0de4c53 100644 --- a/shared/subspace-data-retrieval/src/piece_getter.rs +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -21,18 +21,14 @@ use std::sync::Arc; use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::pieces::{Piece, PieceIndex}; -/// A type-erased error -pub type BoxError = Box; - /// Trait representing a way to get pieces from the DSN for object reconstruction -// TODO: make ObjectPieceGetter impls retry before failing, if that is useful #[async_trait] pub trait ObjectPieceGetter: fmt::Debug { /// Get piece by index. /// /// Returns `Ok(None)` if the piece is not found. /// Returns `Err(_)` if trying to get the piece caused an error. - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError>; + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result>; } #[async_trait] @@ -40,7 +36,7 @@ impl ObjectPieceGetter for Arc where T: ObjectPieceGetter + Send + Sync + ?Sized, { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { self.as_ref().get_piece(piece_index).await } } @@ -48,7 +44,7 @@ where // Convenience methods, mainly used in testing #[async_trait] impl ObjectPieceGetter for NewArchivedSegment { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if piece_index.segment_index() == self.segment_header.segment_index() { return Ok(Some( self.pieces @@ -64,7 +60,7 @@ impl ObjectPieceGetter for NewArchivedSegment { #[async_trait] impl ObjectPieceGetter for (PieceIndex, Piece) { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if self.0 == piece_index { return Ok(Some(self.1.clone())); } From 01750d29fea18869f53d7c5302bbd0b019de699f Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 09:43:25 +1000 Subject: [PATCH 2/4] Add ObjectPieceGetter::get_pieces() --- crates/subspace-gateway/src/piece_getter.rs | 17 ++++ .../src/piece_getter.rs | 78 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs index 8d90667f65..a3172afc6b 100644 --- a/crates/subspace-gateway/src/piece_getter.rs +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use futures::stream::StreamExt; +use futures::Stream; use std::fmt; use std::ops::{Deref, DerefMut}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; @@ -64,6 +65,22 @@ where Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + Ok(Box::new( + self.get_from_cache(piece_indices) + .await + .map(|(index, maybe_piece)| (index, Ok(maybe_piece))), + )) + } } impl DsnPieceGetter diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs index 3dd0de4c53..c2d58982ab 100644 --- a/shared/subspace-data-retrieval/src/piece_getter.rs +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -16,7 +16,9 @@ //! Getting object pieces from the Subspace Distributed Storage Network, or various caches. use async_trait::async_trait; +use futures::{stream, Stream, StreamExt}; use std::fmt; +use std::future::Future; use std::sync::Arc; use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::pieces::{Piece, PieceIndex}; @@ -29,6 +31,19 @@ pub trait ObjectPieceGetter: fmt::Debug { /// Returns `Ok(None)` if the piece is not found. /// Returns `Err(_)` if trying to get the piece caused an error. async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result>; + + /// Get pieces with provided indices. + /// + /// The number of elements in the returned stream is the same as the number of unique + /// `piece_indices`. + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a; } #[async_trait] @@ -39,6 +54,18 @@ where async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { self.as_ref().get_piece(piece_index).await } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + self.as_ref().get_pieces(piece_indices).await + } } // Convenience methods, mainly used in testing @@ -56,6 +83,18 @@ impl ObjectPieceGetter for NewArchivedSegment { Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices) + } } #[async_trait] @@ -67,4 +106,43 @@ impl ObjectPieceGetter for (PieceIndex, Piece) { Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices) + } +} + +/// A default implementation which gets each piece individually, using the `get_piece` async +/// function. +/// +/// This is mainly used for testing, most production implementations can fetch multiple pieces more +/// efficiently. +#[expect(clippy::type_complexity, reason = "type matches trait signature")] +pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>( + // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result> once it stabilises + // https://github.com/rust-lang/rust/issues/62290 + get_piece: Func, + piece_indices: PieceIndices, +) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, +> +where + PieceIndices: IntoIterator + Send + 'a, + Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a, + Fut: Future>> + Send + Unpin + 'a, +{ + Ok(Box::new(Box::pin(stream::iter(piece_indices).then( + move |piece_index| { + let get_piece = get_piece.clone(); + async move { (piece_index, get_piece(piece_index).await) } + }, + )))) } From 170588b8486261868d1dc4cb28b69ec1f18c60db Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 11:20:20 +1000 Subject: [PATCH 3/4] Fetch multiple pieces in the piece fetcher --- .../src/piece_fetcher.rs | 39 ++++--------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index afe1680160..359c09fd9a 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -17,8 +17,7 @@ use crate::object_fetcher::Error; use crate::piece_getter::ObjectPieceGetter; -use futures::stream::FuturesOrdered; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tracing::{debug, trace}; @@ -42,40 +41,18 @@ where ); // TODO: - // - consider using a semaphore to limit the number of concurrent requests, like - // download_segment_pieces() // - if we're close to the number of pieces in a segment, use segment downloading and piece // reconstruction instead // Currently most objects are limited to 4 pieces, so this isn't needed yet. - let received_pieces = piece_indexes - .iter() - .map(|piece_index| async move { - match piece_getter.get_piece(*piece_index).await { - Ok(Some(piece)) => { - trace!(?piece_index, "Piece request succeeded",); - Ok(piece) - } - Ok(None) => { - trace!(?piece_index, "Piece not found"); - Err(Error::PieceNotFound { - piece_index: *piece_index, - } - .into()) - } - Err(error) => { - trace!( - %error, - ?piece_index, - "Piece request caused an error", - ); - Err(error) - } - } - }) - .collect::>(); + let received_pieces = piece_getter + .get_pieces(piece_indexes.iter().copied()) + .await?; // We want exact pieces, so any errors are fatal. - let received_pieces: Vec = received_pieces.try_collect().await?; + let received_pieces: Vec = received_pieces + .map(|(piece_index, maybe_piece)| maybe_piece?.ok_or(Error::PieceNotFound { piece_index })) + .try_collect() + .await?; trace!( count = piece_indexes.len(), From f488e3aa3d0c2864e92884b5486e4f8daa8b84df Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Oct 2024 11:20:41 +1000 Subject: [PATCH 4/4] Rewrite the segment fetcher to fetch multiple pieces --- Cargo.lock | 1 - shared/subspace-data-retrieval/Cargo.toml | 1 - .../src/segment_fetcher.rs | 134 ++++++++++-------- 3 files changed, 75 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b26117bc90..a4e75240bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12470,7 +12470,6 @@ name = "subspace-data-retrieval" version = "0.1.0" dependencies = [ "anyhow", - "async-lock 3.4.0", "async-trait", "futures", "parity-scale-codec", diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 55119acd13..73eb748d12 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -13,7 +13,6 @@ include = [ [dependencies] anyhow = "1.0.89" -async-lock = "3.4.0" async-trait = "0.1.83" futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } diff --git a/shared/subspace-data-retrieval/src/segment_fetcher.rs b/shared/subspace-data-retrieval/src/segment_fetcher.rs index 686f5e7290..679039977a 100644 --- a/shared/subspace-data-retrieval/src/segment_fetcher.rs +++ b/shared/subspace-data-retrieval/src/segment_fetcher.rs @@ -16,9 +16,7 @@ //! Fetching segments of the archived history of Subspace Network. use crate::piece_getter::ObjectPieceGetter; -use async_lock::Semaphore; -use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{stream, StreamExt}; use subspace_archiving::archiver::Segment; use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError}; use subspace_core_primitives::pieces::Piece; @@ -72,9 +70,6 @@ where } /// Concurrently downloads the pieces for `segment_index`. -// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks(): -// -// // TODO: pass a lower concurrency limit into this function, to avoid overwhelming residential routers or slow connections pub async fn download_segment_pieces( segment_index: SegmentIndex, @@ -85,66 +80,87 @@ where { debug!(%segment_index, "Retrieving pieces of the segment"); - let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS); - - let mut received_segment_pieces = segment_index - .segment_piece_indexes_source_first() - .into_iter() - .map(|piece_index| { - // Source pieces will acquire permit here right away - let maybe_permit = semaphore.try_acquire(); - - async move { - let permit = match maybe_permit { - Some(permit) => permit, - None => { - // Other pieces will acquire permit here instead - semaphore.acquire().await - } - }; - let piece = match piece_getter.get_piece(piece_index).await { - Ok(Some(piece)) => piece, - Ok(None) => { - trace!(?piece_index, "Piece not found"); - return None; - } - Err(error) => { - trace!( - %error, - ?piece_index, - "Piece request failed", - ); - return None; - } - }; - - trace!(?piece_index, "Piece request succeeded"); - - // Piece was received successfully, "remove" this slot from semaphore - permit.forget(); - Some((piece_index, piece)) - } - }) - .collect::>(); - + // We want NUM_RAW_RECORDS pieces to reconstruct the segment, but it doesn't matter exactly which ones. + let piece_indexes = segment_index.segment_piece_indexes_source_first(); + let mut piece_indexes = piece_indexes.as_slice(); let mut segment_pieces = vec![None::; ArchivedHistorySegment::NUM_PIECES]; + + let mut pieces_pending = 0; let mut pieces_received = 0; + let mut piece_streams = Vec::new(); + + // Loop Invariant: + // - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or + // - the number of pending pieces gets smaller, eventually triggering another batch. + // We also exit early if we have enough pieces to reconstruct a segment. + 'fetcher: while !piece_indexes.is_empty() + && pieces_received < RecordedHistorySegment::NUM_RAW_RECORDS + { + // Request the number of pieces needed to reconstruct the segment, assuming all pending + // pieces are successful. + let batch_size = RecordedHistorySegment::NUM_RAW_RECORDS - pieces_received - pieces_pending; + if batch_size > 0 { + let (batch_indexes, remaining_piece_indexes) = piece_indexes + .split_at_checked(batch_size) + .unwrap_or((piece_indexes, &[])); + // Invariant: the number of remaining piece indexes gets smaller. + piece_indexes = remaining_piece_indexes; + + let stream = piece_getter.get_pieces(batch_indexes.iter().cloned()).await; + match stream { + Ok(stream) => { + piece_streams.push(stream); + pieces_pending += batch_size; + } + Err(error) => { + // A single batch failure isn't fatal. + debug!( + ?error, + %segment_index, + batch_size, + pieces_pending, + pieces_received, + pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS, + "Segment piece getter batch failed" + ); + } + } + } - while let Some(maybe_piece) = received_segment_pieces.next().await { - let Some((piece_index, piece)) = maybe_piece else { - continue; + // Poll all the batches concurrently, getting all finished pieces. + let piece_responses = stream::iter(&mut piece_streams) + .flatten_unordered(None) + .ready_chunks(RecordedHistorySegment::NUM_RAW_RECORDS) + .next() + .await; + + let Some(piece_responses) = piece_responses else { + // All streams have finished, perhaps abnormally, so reset the number of pending pieces. + // Invariant: the number of pending pieces gets smaller. + pieces_pending = 0; + continue 'fetcher; }; - segment_pieces - .get_mut(piece_index.position() as usize) - .expect("Piece position is by definition within segment; qed") - .replace(piece); + // Process the piece responses. + 'processor: for maybe_piece in piece_responses { + // Invariant: the number of pending pieces gets smaller. + pieces_pending -= 1; + + let (piece_index, Ok(Some(piece))) = maybe_piece else { + continue 'processor; + }; - pieces_received += 1; + segment_pieces + .get_mut(piece_index.position() as usize) + .expect("Piece position is by definition within segment; qed") + .replace(piece); - if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS { - trace!(%segment_index, "Received half of the segment."); - break; + pieces_received += 1; + + if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS { + trace!(%segment_index, "Received half of the segment."); + break 'fetcher; + } } }