Skip to content

Commit

Permalink
feat: add outer retry loop to CloudObjectReader::size (#2151)
Browse files Browse the repository at this point in the history
A user was getting periodic errors from GCS:

```
OSError: Io error: Execution error: LanceError(IO): Generic GCS error: Error after 0 retries in 122.602µs, max_retries:10, retry_timeout:180s, source:error sending request for url (https://storage.googleapis.com/some-file.lance): connection error: unexpected end of file, /home/runner/work/lance/lance/rust/lance-io/src/object_reader.rs:61:12
```

This request was not retried because it was a request error (`error
sending request`) and not a connection error and it is not a timeout.
Therefore it does not meet the criteria currently defined by
`object_store`. However, this does appear to be a legitimate "this can
sometimes randomly fail" error that we want to retry.

We already had an outer retry loop in `get_range` for a similar kind of
failure and I just extended that outer retry loop to `size`.
  • Loading branch information
westonpace authored Apr 4, 2024
1 parent 767428f commit 19430cd
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};

Expand Down Expand Up @@ -44,6 +45,27 @@ impl CloudObjectReader {
block_size,
})
}

// Retries for the initial request are handled by object store, but
// there are no retries for failures that occur during the streaming
// of the response body. Thus we add an outer retry loop here.
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err.into());
}
retries -= 1;
}
}
}
}
}

#[async_trait]
Expand All @@ -58,24 +80,14 @@ impl Reader for CloudObjectReader {

/// Object/File Size.
async fn size(&self) -> Result<usize> {
Ok(self.object_store.head(&self.path).await?.size)
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
}

async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
// Retries for the initial request are handled by object store, but
// there are no retries for failures that occur during the streaming
// of the response body. Thus we add an outer retry loop here.
let mut retries = 3;
loop {
match self.object_store.get_range(&self.path, range.clone()).await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
return Err(err.into());
}
retries -= 1;
}
}
}
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
}

0 comments on commit 19430cd

Please sign in to comment.