Skip to content

Commit

Permalink
adds retries to google sheets
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Aug 14, 2023
1 parent baad8d4 commit 515c1b7
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 15 deletions.
26 changes: 22 additions & 4 deletions .github/workflows/test_on_local_destinations_forks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
# if: ${{ github.event.pull_request.head.repo.fork }}

run_loader:
name: test on local postgres and duckdb
name: test on local postgres and duckdb on forks
needs: get_changed_sources
if: needs.get_changed_sources.outputs.sources_list != ''
strategy:
Expand All @@ -31,6 +31,26 @@ jobs:
shell: bash
runs-on: "ubuntu-latest"

# Service containers to run with `container-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
POSTGRES_USER: loader
POSTGRES_PASSWORD: loader
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Check out
uses: actions/checkout@master
Expand Down Expand Up @@ -61,9 +81,7 @@ jobs:

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > sources/.dlt/secrets.toml
# run: pwd && echo "$DESTINATIONS_SECRETS" > sources/.dlt/secrets.toml && echo "$SOURCES_SECRETS" >> sources/.dlt/secrets.toml
# - name: Setup upterm session
# uses: lhotari/action-upterm@v1

- run: |
sources_list="${{ needs.get_changed_sources.outputs.sources_list }}"
test_paths=$(echo "$sources_list" | awk '{for(i=1;i<=NF;i++) printf "tests/%s ", $i}')
Expand Down
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ ALL_DESTINATIONS='["postgres"]' pytest tests/chess

There's also `make test-local` command that will run all the tests on `duckdb` and `postgres`.
## Running tests on CI
## Advanced topics
### Ensuring the correct Python version
Expand Down
14 changes: 5 additions & 9 deletions sources/google_sheets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def google_spreadsheet(
] = dlt.secrets.value,
get_sheets: bool = False,
get_named_ranges: bool = True,
max_api_retries: int = 5,
) -> Iterable[DltResource]:
"""
The source for the dlt pipeline. It returns the following resources:
Expand All @@ -43,12 +44,13 @@ def google_spreadsheet(
Defaults to False.
get_named_ranges (bool, optional): If True, load all the named ranges inside the spreadsheet into the database.
Defaults to True.
max_api_retries (int, optional): Max number of retires to google sheets API. Actual behavior is internal to google client.
Yields:
Iterable[DltResource]: List of dlt resources.
"""
# authenticate to the service using the helper function
service = api_auth(credentials)
service = api_auth(credentials, max_api_retries=max_api_retries)
# get spreadsheet id from url or id
spreadsheet_id = get_spreadsheet_id(spreadsheet_url_or_id)
all_range_names = set(range_names or [])
Expand Down Expand Up @@ -107,14 +109,8 @@ def google_spreadsheet(
metadata_table[-1]["skipped"] = False
range_data.append((name, parsed_range, meta_range, values))

meta_values = (
service.spreadsheets()
.get(
spreadsheetId=spreadsheet_id,
ranges=[str(data[2]) for data in range_data],
includeGridData=True,
)
.execute()
meta_values = api_calls.get_meta_for_ranges(
service, spreadsheet_id, [str(data[2]) for data in range_data]
)
for name, parsed_range, _, values in range_data:
logger.info(f"Processing range {parsed_range} with name {name}")
Expand Down
56 changes: 54 additions & 2 deletions sources/google_sheets/helpers/api_calls.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Contains helper functions to extract data from spreadsheet API"""

from typing import Any, List, Tuple
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny

from dlt.sources.credentials import GcpCredentials, GcpOAuthCredentials
from dlt.sources.helpers.requests.retry import DEFAULT_RETRY_STATUS

from .data_processing import ParsedRange, trim_range_top_left

Expand All @@ -15,23 +17,72 @@
raise MissingDependencyException("Google API Client", ["google-api-python-client"])


def api_auth(credentials: GcpCredentials) -> Resource:
def is_retry_status_code(exception: BaseException) -> bool:
"""Retry condition on HttpError"""
from googleapiclient.errors import HttpError # type: ignore

# print(f"RETRY ON {str(HttpError)} = {isinstance(exception, HttpError) and exception.resp.status in DEFAULT_RETRY_STATUS}")
# if isinstance(exception, HttpError):
# print(exception.resp.status)
# print(DEFAULT_RETRY_STATUS)
return (
isinstance(exception, HttpError)
and exception.resp.status in DEFAULT_RETRY_STATUS
)


retry_deco = retry(
# Retry if it's a rate limit error (HTTP 429)
retry=retry_if_exception(is_retry_status_code),
# Use exponential backoff for the waiting time between retries, starting with 5 seconds
wait=wait_exponential(multiplier=1.5, min=5, max=120),
# Stop retrying after 10 attempts
stop=stop_after_attempt(10),
# Print out the retrying details
reraise=True,
)


def api_auth(credentials: GcpCredentials, max_api_retries: int) -> Resource:
"""
Uses GCP credentials to authenticate with Google Sheets API.
Args:
credentials (GcpCredentials): Credentials needed to log in to GCP.
max_api_retries (int): Max number of retires to google sheets API. Actual behavior is internal to google client.
Returns:
Resource: Object needed to make API calls to Google Sheets API.
"""
if isinstance(credentials, GcpOAuthCredentials):
credentials.auth("https://www.googleapis.com/auth/spreadsheets.readonly")
# Build the service object for Google sheets api.
service = build("sheets", "v4", credentials=credentials.to_native_credentials())
service = build(
"sheets",
"v4",
credentials=credentials.to_native_credentials(),
num_retries=max_api_retries,
)
return service


@retry_deco
def get_meta_for_ranges(
service: Resource, spreadsheet_id: str, range_names: List[str]
) -> Any:
"""Retrieves `spreadsheet_id` cell metadata for `range_names`"""
return (
service.spreadsheets()
.get(
spreadsheetId=spreadsheet_id,
ranges=range_names,
includeGridData=True,
)
.execute()
)


@retry_deco
def get_known_range_names(
spreadsheet_id: str, service: Resource
) -> Tuple[List[str], List[str], str]:
Expand All @@ -52,6 +103,7 @@ def get_known_range_names(
return sheet_names, named_ranges, title


@retry_deco
def get_data_for_ranges(
service: Resource, spreadsheet_id: str, range_names: List[str]
) -> List[Tuple[str, ParsedRange, ParsedRange, List[List[Any]]]]:
Expand Down

0 comments on commit 515c1b7

Please sign in to comment.