Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch multiple pieces during object reconstruction #3158

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

teor2345
Copy link
Contributor

This PR changes ObjectFetcher to use the multi-piece fetching methods of PieceProvider.

As part of this change, ObjectFetcher couldn't be used as a dyn trait any more, because of the generic bounds on get_pieces().

Code contributor checklist:

@teor2345 teor2345 added the enhancement New feature or request label Oct 22, 2024
@teor2345 teor2345 self-assigned this Oct 22, 2024
Comment on lines +79 to +81
self.get_from_cache(piece_indices)
.await
.map(|(index, maybe_piece)| (index, Ok(maybe_piece))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth having a TODO that this is supposed to eventually reach out to archival storage too and do reconstruction if necessary. Cache is what should be used basically all the time, but cold storage fallback is still necessary for completeness.

Comment on lines +52 to +55
let received_pieces: Vec<Piece> = received_pieces
.map(|(piece_index, maybe_piece)| maybe_piece?.ok_or(Error::PieceNotFound { piece_index }))
.try_collect()
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not equivalent to before. The order of returned pieces is arbitrary (hence piece_index in returned stream in addition to just pieces).

// - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or
// - the number of pending pieces gets smaller, eventually triggering another batch.
// We also exit early if we have enough pieces to reconstruct a segment.
'fetcher: while !piece_indexes.is_empty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two stages:

  • first try to download all 128 source pieces, if successful reconstruction will be very cheap and fast
  • as a fallback when actual reconstruction is needed, schedule more pieces to download, including parity

Right now it is implemented in a way that is a bit wasteful in terms of bandwidth (triggers more downloads than needed) and in terms of CPU usage (has overwhelmingly high chance of not getting 128 source pieces for cheap segment reconstruction).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already what the code does?

The first batch contains 128 source piece indexes. If all pieces in that batch succeed, then there aren’t any parity piece requests.

But as soon as any pieces fail, a batch of parity piece indexes is created, which contains exactly the number of pieces needed to compensate for those failures. Then all batches are polled concurrently.

The code assumes that any pieces that are still pending will succeed, so there’s no wasted downloads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the way it is written wasn't as clear, but now I understand what it does. If you exhaust the stream of pieces you've generated every time, why do you keep already finished streams in piece_streams (the question above)? I don't see how multiple streams can be pooled concurrently here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the way it is written wasn't as clear, but now I understand what it does.

Good feedback, I might split it into multiple methods so it's clearer.

flatten_unordered() polls all the streams in the vector concurrently, and can return the pieces from any stream.

ready_chunks() waits until at least one piece result is ready, then returns all the ready pieces as a vector. But if any pieces are still pending, they are left in the stream.

Then if any of the piece results in that vector are None, we add a batch of parity pieces to replace them.

We can definitely drop streams once they're done, I'll make some notes about how to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually already wrote an efficient piece retrieval for reconstruction purposes in https://github.com/autonomys/subspace/blob/362c1f5dce076b6c13452977a533f9091d515bb1/crates/subspace-farmer-components/src/segment_reconstruction.rs

It might be a little simpler and it does try to download pieces in batches all the time, avoiding batches of 1 that you will most likely get majority of time after initial request due to the way ready_chunks works. It is overall less eager and tries to not do a lot of heavy lifting.

Can be extracted into a utility somewhere to return a segment worth of pieces and then reused in farmer code, here and in DSN sync on the node, where exactly the same logic will be necessary for piece retrieval, just what we do with those pieces is slightly different.

Base automatically changed from gateway-args to main October 22, 2024 12:57
Copy link
Contributor Author

@teor2345 teor2345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll turn some of my comments on this PR into code comments, so the structure of the code is clearer.

impl<PG> SubspaceGatewayRpcApiServer for SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error> {
// TODO: deny unsafe RPC calls
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: remove these TODOs, we might not need them

// - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or
// - the number of pending pieces gets smaller, eventually triggering another batch.
// We also exit early if we have enough pieces to reconstruct a segment.
'fetcher: while !piece_indexes.is_empty()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already what the code does?

The first batch contains 128 source piece indexes. If all pieces in that batch succeed, then there aren’t any parity piece requests.

But as soon as any pieces fail, a batch of parity piece indexes is created, which contains exactly the number of pieces needed to compensate for those failures. Then all batches are polled concurrently.

The code assumes that any pieces that are still pending will succeed, so there’s no wasted downloads.

Copy link
Member

@shamil-gadelshin shamil-gadelshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense overall, just a question related to a special case.

if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS {
trace!(%segment_index, "Received half of the segment.");
break 'fetcher;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider a case with insufficient pieces present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there aren't enough pieces, we go to the top of the 'fetcher: loop again, and try to get more pieces from the segment.

If we've tried all the pieces in the segment, and can't get enough to reconstruct the segment, we end the 'fetcher: loop, and return an error.

That error isn't visible in this diff, it's on line 159/175, about 10 lines below this line.

Copy link
Contributor Author

@teor2345 teor2345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both for the reviews! I'll work on a refactor to make this code clearer.

if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS {
trace!(%segment_index, "Received half of the segment.");
break 'fetcher;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there aren't enough pieces, we go to the top of the 'fetcher: loop again, and try to get more pieces from the segment.

If we've tried all the pieces in the segment, and can't get enough to reconstruct the segment, we end the 'fetcher: loop, and return an error.

That error isn't visible in this diff, it's on line 159/175, about 10 lines below this line.

// - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or
// - the number of pending pieces gets smaller, eventually triggering another batch.
// We also exit early if we have enough pieces to reconstruct a segment.
'fetcher: while !piece_indexes.is_empty()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the way it is written wasn't as clear, but now I understand what it does.

Good feedback, I might split it into multiple methods so it's clearer.

flatten_unordered() polls all the streams in the vector concurrently, and can return the pieces from any stream.

ready_chunks() waits until at least one piece result is ready, then returns all the ready pieces as a vector. But if any pieces are still pending, they are left in the stream.

Then if any of the piece results in that vector are None, we add a batch of parity pieces to replace them.

We can definitely drop streams once they're done, I'll make some notes about how to do that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants