Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch multiple pieces during object reconstruction #3158

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::ops::{Deref, DerefMut};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use tracing::debug;

const SUBSPACE_ERROR: i32 = 9000;
Expand Down Expand Up @@ -99,30 +100,43 @@ pub trait SubspaceGatewayRpcApi {
#[method(name = "subspace_fetchObject")]
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error>;
}

/// Subspace Gateway RPC configuration
pub struct SubspaceGatewayRpcConfig {
pub struct SubspaceGatewayRpcConfig<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
pub object_fetcher: ObjectFetcher,
pub object_fetcher: ObjectFetcher<PG>,
}

/// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway.
pub struct SubspaceGatewayRpc {
pub struct SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
object_fetcher: ObjectFetcher,
object_fetcher: ObjectFetcher<PG>,
}

/// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN.
impl SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// Creates a new instance of the `SubspaceGatewayRpc` handler.
pub fn new(config: SubspaceGatewayRpcConfig) -> Self {
pub fn new(config: SubspaceGatewayRpcConfig<PG>) -> Self {
Self {
object_fetcher: config.object_fetcher,
}
}
}

#[async_trait]
impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpcApiServer for SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error> {
// TODO: deny unsafe RPC calls
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: remove these TODOs, we might not need them


Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::{select, FutureExt};
use std::env;
use std::num::NonZeroUsize;
use std::pin::pin;
use std::sync::Arc;
use subspace_core_primitives::pieces::Record;
use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_erasure_coding::ErasureCoding;
Expand Down Expand Up @@ -95,7 +96,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
dsn_node.clone(),
SegmentCommitmentPieceValidator::new(dsn_node, node_client, kzg),
);
let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size));
let object_fetcher = ObjectFetcher::new(Arc::new(piece_getter), erasure_coding, Some(max_size));

let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher });
let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?;
Expand Down
12 changes: 8 additions & 4 deletions crates/subspace-gateway/src/commands/run/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use clap::Parser;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer};
use tracing::info;

Expand All @@ -27,10 +28,13 @@ pub(crate) struct RpcOptions<const DEFAULT_PORT: u16> {
// - add an argument for a custom tokio runtime
// - move this RPC code into a new library part of this crate
// - make a RPC config that is independent of clap
pub async fn launch_rpc_server<const P: u16>(
rpc_api: SubspaceGatewayRpc,
rpc_options: RpcOptions<P>,
) -> anyhow::Result<ServerHandle> {
pub async fn launch_rpc_server<PG, const DEFAULT_PORT: u16>(
rpc_api: SubspaceGatewayRpc<PG>,
rpc_options: RpcOptions<DEFAULT_PORT>,
) -> anyhow::Result<ServerHandle>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
let server = ServerBuilder::default()
.build(rpc_options.rpc_listen_on)
.await?;
Expand Down
21 changes: 19 additions & 2 deletions crates/subspace-gateway/src/piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::Stream;
use std::fmt;
use std::ops::{Deref, DerefMut};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use subspace_networking::Node;

Expand Down Expand Up @@ -51,7 +52,7 @@ impl<PV> ObjectPieceGetter for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, BoxError> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
if let Some((got_piece_index, maybe_piece)) =
self.get_from_cache([piece_index]).await.next().await
{
Expand All @@ -64,6 +65,22 @@ where

Ok(None)
}

async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
Ok(Box::new(
self.get_from_cache(piece_indices)
.await
.map(|(index, maybe_piece)| (index, Ok(maybe_piece))),
Comment on lines +79 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth having a TODO that this is supposed to eventually reach out to archival storage too and do reconstruction if necessary. Cache is what should be used basically all the time, but cold storage fallback is still necessary for completeness.

))
}
}

impl<PV> DsnPieceGetter<PV>
Expand Down
2 changes: 1 addition & 1 deletion shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ include = [
]

[dependencies]
async-lock = "3.4.0"
anyhow = "1.0.89"
async-trait = "0.1.83"
futures = "0.3.31"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
Expand Down
27 changes: 15 additions & 12 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! Fetching objects stored in the archived history of Subspace Network.

use crate::piece_fetcher::download_pieces;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use crate::piece_getter::ObjectPieceGetter;
use crate::segment_fetcher::{download_segment, SegmentGetterError};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +123,7 @@ pub enum Error {
#[error("Getting piece caused an error: {source:?}")]
PieceGetterError {
#[from]
source: BoxError,
source: anyhow::Error,
},

/// Piece getter couldn't find the piece
Expand All @@ -132,9 +132,12 @@ pub enum Error {
}

/// Object fetcher for the Subspace DSN.
pub struct ObjectFetcher {
pub struct ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// The piece getter used to fetch pieces.
piece_getter: Arc<dyn ObjectPieceGetter + Send + Sync + 'static>,
piece_getter: Arc<PG>,

/// The erasure coding configuration of those pieces.
erasure_coding: ErasureCoding,
Expand All @@ -143,21 +146,21 @@ pub struct ObjectFetcher {
max_object_len: usize,
}

impl ObjectFetcher {
impl<PG> ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// Create a new object fetcher with the given configuration.
///
/// `max_object_len` is the amount of data bytes we'll read for a single object before giving
/// up and returning an error, or `None` for no limit (`usize::MAX`).
pub fn new<PG>(
piece_getter: PG,
pub fn new(
piece_getter: Arc<PG>,
erasure_coding: ErasureCoding,
max_object_len: Option<usize>,
) -> Self
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
) -> Self {
Self {
piece_getter: Arc::new(piece_getter),
piece_getter,
erasure_coding,
max_object_len: max_object_len.unwrap_or(usize::MAX),
}
Expand Down
43 changes: 10 additions & 33 deletions shared/subspace-data-retrieval/src/piece_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
//! Fetching pieces of the archived history of Subspace Network.

use crate::object_fetcher::Error;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use crate::piece_getter::ObjectPieceGetter;
use futures::{StreamExt, TryStreamExt};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};

Expand All @@ -31,7 +30,7 @@ use tracing::{debug, trace};
pub async fn download_pieces<PG>(
piece_indexes: &[PieceIndex],
piece_getter: &PG,
) -> Result<Vec<Piece>, BoxError>
) -> anyhow::Result<Vec<Piece>>
where
PG: ObjectPieceGetter,
{
Expand All @@ -42,40 +41,18 @@ where
);

// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
trace!(?piece_index, "Piece not found");
Err(Error::PieceNotFound {
piece_index: *piece_index,
}
.into())
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request caused an error",
);
Err(error)
}
}
})
.collect::<FuturesOrdered<_>>();
let received_pieces = piece_getter
.get_pieces(piece_indexes.iter().copied())
.await?;

// We want exact pieces, so any errors are fatal.
let received_pieces: Vec<Piece> = received_pieces.try_collect().await?;
let received_pieces: Vec<Piece> = received_pieces
.map(|(piece_index, maybe_piece)| maybe_piece?.ok_or(Error::PieceNotFound { piece_index }))
.try_collect()
.await?;
Comment on lines +52 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not equivalent to before. The order of returned pieces is arbitrary (hence piece_index in returned stream in addition to just pieces).


trace!(
count = piece_indexes.len(),
Expand Down
Loading
Loading