Skip to content

Commit

Permalink
integration-tests: Improves fetching sample data
Browse files Browse the repository at this point in the history
by also retrying corrupted archives, being more specific regarding
retryable exceptions and setting a maximum of retries.
  • Loading branch information
funkyfuture committed Jul 26, 2024
1 parent a5e840c commit 5e5a43a
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions integration-tests/fetch-corpora.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@
import io
import os
import tarfile
import zlib
from pathlib import Path
from tempfile import TemporaryFile
from typing import Final, NamedTuple

from httpx import AsyncClient, HTTPError
from tenacity import retry, wait_random_exponential
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_incrementing,
wait_random_exponential,
)

import delb

Expand Down Expand Up @@ -245,18 +252,17 @@ class Archive(NamedTuple):
http_client: Final = AsyncClient()


@retry(wait=wait_random_exponential(multiplier=1, max=120))
async def fetch_resource(url: str, destination: io.BufferedWriter) -> bool:
@retry(
retry=retry_if_exception_type(HTTPError),
wait=wait_random_exponential(multiplier=1, max=120),
stop=stop_after_attempt(12),
reraise=True,
)
async def fetch_resource(url: str, destination: io.BufferedWriter):
async with http_client.stream("GET", url, follow_redirects=True) as response:
try:
async for chunk in response.aiter_bytes():
destination.write(chunk)
except HTTPError as e:
print(f"Failed to fetch {url}: {e}")
return False
else:
print(f"Downloaded {url} to {destination.name}")
return True
async for chunk in response.aiter_bytes():
destination.write(chunk)
print(f"Downloaded {url} to {destination.name}")


def make_archive_filter(archive_description: Archive) -> callable:
Expand Down Expand Up @@ -286,6 +292,12 @@ def _filter(member: tarfile.TarInfo, path: str) -> tarfile.TarInfo | None:
# there's no business like business


@retry(
retry=retry_if_exception_type((EOFError, zlib.error)),
wait=wait_incrementing(start=1, increment=0.5, max=5),
stop=stop_after_attempt(12),
reraise=True,
)
async def fetch_archive(archive_description: Archive):
target_folder = CORPORA_PATH / archive_description.target_directory
if SKIP_EXISTING and target_folder.exists():
Expand Down

0 comments on commit 5e5a43a

Please sign in to comment.