Skip to content

Commit

Permalink
skips blank columns, adds demo for multiple spreadheets and table ren…
Browse files Browse the repository at this point in the history
…ame (#238)
  • Loading branch information
rudolfix authored Aug 9, 2023
1 parent 8b3e940 commit 173e709
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 25 deletions.
6 changes: 5 additions & 1 deletion sources/google_sheets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ If you are not happy with the workflow above, you can:
* Enable retrieving all sheets/tabs with `get_sheets` option set to True
* Pass a list of ranges as supported by Google Sheets in `range_names`

Note that hidden columns will be extracted.

> 💡 You can load data from many spreadsheets and also rename the tables to which data is loaded. This is standard part of `dlt`, see `load_with_table_rename_and_multiple_spreadsheets` demo in `google_sheets_pipeline.py`
### Make sure your data has headers and is a proper table
**First row of any extracted range should contain headers**. Please make sure:
1. The header names are strings and are unique.
Expand Down Expand Up @@ -52,7 +56,7 @@ You can pass explicit ranges to the `google_spreadsheet`:

## The `spreadsheet_info` table
This table is repopulated after every load and keeps the information on loaded ranges:
* id of the spreadsheet
* id and title of the spreadsheet
* name of the range as passed to the source
* string representation of the loaded range
* range above in parsed representation
Expand Down
9 changes: 5 additions & 4 deletions sources/google_sheets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def google_spreadsheet(
spreadsheet_id = get_spreadsheet_id(spreadsheet_url_or_id)
all_range_names = set(range_names or [])
# if no explicit ranges, get sheets and named ranges from metadata
# get metadata with list of sheets and named ranges in the spreadsheet
sheet_names, named_ranges, spreadsheet_title = api_calls.get_known_range_names(
spreadsheet_id=spreadsheet_id, service=service
)
if not range_names:
# get metadata with list of sheets and named ranges in the spreadsheet
sheet_names, named_ranges = api_calls.get_known_range_names(
spreadsheet_id=spreadsheet_id, service=service
)
if get_sheets:
all_range_names.update(sheet_names)
if get_named_ranges:
Expand Down Expand Up @@ -86,6 +86,7 @@ def google_spreadsheet(
metadata_table.append(
{
"spreadsheet_id": spreadsheet_id,
"title": spreadsheet_title,
"range_name": name,
"range": str(parsed_range),
"range_parsed": parsed_range._asdict(),
Expand Down
7 changes: 4 additions & 3 deletions sources/google_sheets/helpers/api_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def api_auth(credentials: GcpCredentials) -> Resource:

def get_known_range_names(
spreadsheet_id: str, service: Resource
) -> Tuple[List[str], List[str]]:
) -> Tuple[List[str], List[str], str]:
"""
Retrieves spreadsheet metadata and extracts a list of sheet names and named ranges
Expand All @@ -43,12 +43,13 @@ def get_known_range_names(
service (Resource): Resource object used to make API calls to Google Sheets API.
Returns:
Tuple[List[str], List[str]]
Tuple[List[str], List[str], str] sheet names, named ranges, spreadheet title
"""
metadata = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheet_names: List[str] = [s["properties"]["title"] for s in metadata["sheets"]]
named_ranges: List[str] = [r["name"] for r in metadata.get("namedRanges", {})]
return sheet_names, named_ranges
title: str = metadata["properties"]["title"]
return sheet_names, named_ranges, title


def get_data_for_ranges(
Expand Down
15 changes: 10 additions & 5 deletions sources/google_sheets/helpers/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,18 @@ def get_range_headers(headers_metadata: List[DictStrAny], range_name: str) -> Li
for idx, header in enumerate(headers_metadata):
header_val: str = None
if header:
if "stringValue" in header["effectiveValue"]:
if "stringValue" in header.get("effectiveValue", {}):
header_val = header["formattedValue"]
else:
logger.warning(
f"In range {range_name}, header value: {header['formattedValue']} is not a string!"
)
return None
header_val = header.get("formattedValue", None)
# if there's no formatted value then the cell is empty (no empty string as well!) in that case add auto name and move on
if header_val is None:
header_val = str(f"col_{idx + 1}")
else:
logger.warning(
f"In range {range_name}, header value: {header_val} at position {idx+1} is not a string!"
)
return None
else:
logger.warning(
f"In range {range_name}, header at position {idx+1} is not missing!"
Expand Down
49 changes: 46 additions & 3 deletions sources/google_sheets_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def load_pipeline_with_ranges() -> None:
)
data = google_spreadsheet(
"https://docs.google.com/spreadsheets/d/1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580/edit#gid=0",
range_names=["NamedRange1", "Sheet 1", "Sheet 1!A1:D4"],
range_names=["hidden_columns_merged_cells", "Blank Columns"],
# range_names=["NamedRange1", "Sheet 1", "Sheet 1!A1:D4"],
get_sheets=False,
get_named_ranges=False,
)
Expand All @@ -33,7 +34,9 @@ def load_pipeline_with_sheets() -> None:
dataset_name="sample_google_sheet_data",
)
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
"https://docs.google.com/spreadsheets/d/1BcG3BxKpe_v2kdIPdHE83yqG4oYUUZmL3Bc_rTZ59M4",
# "1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
# "https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0",
get_sheets=True,
get_named_ranges=False,
)
Expand Down Expand Up @@ -79,5 +82,45 @@ def load_pipeline_with_sheets_and_ranges() -> None:
print(info)


def load_with_table_rename_and_multiple_spreadsheets() -> None:
"""Demonstrates how to load two spreadsheets in one pipeline and how to rename tables"""

pipeline = dlt.pipeline(
pipeline_name="google_sheets_pipeline",
destination="duckdb",
full_refresh=False,
dataset_name="sample_google_sheet_data",
)

# take data from spreadsheet 1
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
range_names=["Sheet 1!A1:B10"],
get_named_ranges=False,
)

# take data from spreadsheet 2
data_2 = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
range_names=["Sheet 1!B1:C10"],
get_named_ranges=False,
)
# apply the table name to the existing resource: the resource name is the name of the range
data.resources["Sheet 1!A1:B10"].apply_hints(table_name="my_a1_data")
data_2.resources["Sheet 1!B1:C10"].apply_hints(table_name="second_sheet_data")

# load two spreadsheets
info = pipeline.run([data, data_2])
print(info)
# yes the tables are there
user_tables = pipeline.default_schema.data_tables()
# check if table is there
assert set([t["name"] for t in user_tables]) == {
"my_a1_data",
"second_sheet_data",
"spreadsheet_info",
}


if __name__ == "__main__":
load_pipeline_with_ranges()
load_with_table_rename_and_multiple_spreadsheets()
165 changes: 156 additions & 9 deletions tests/google_sheets/test_google_sheets_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


# list expected tables and the number of columns they are supposed to have
ALL_RANGES = [
ALL_RANGES = {
"empty",
"all_types",
"empty_row",
Expand All @@ -34,16 +34,25 @@
"sheet3",
"sheet4",
"two_tables",
]
"hidden_columns_merged_cells",
"Blank Columns",
}

SKIPPED_RANGES = [
SKIPPED_RANGES = {
"empty",
"only_data",
"only_headers",
"NamedRange2",
]
}

ALL_TABLES_LOADED = [
NAMED_RANGES = {
"NamedRange1",
"NamedRange2",
}

SHEETS = ALL_RANGES - NAMED_RANGES

ALL_TABLES_LOADED = {
"all_types",
"empty_row",
"empty_rows",
Expand All @@ -60,7 +69,9 @@
"sheet4",
"spreadsheet_info",
"two_tables",
]
"hidden_columns_merged_cells",
"blank_columns",
}


def test_single_explicit_range_load() -> None:
Expand Down Expand Up @@ -95,7 +106,7 @@ def test_full_load(destination_name: str) -> None:
# ALL_TABLES is missing spreadsheet info table - table being tested here
schema = pipeline.default_schema
user_tables = schema.data_tables()
assert set([t["name"] for t in user_tables]) == set(ALL_TABLES_LOADED)
assert set([t["name"] for t in user_tables]) == ALL_TABLES_LOADED

# check load metadata
with pipeline.sql_client() as c:
Expand All @@ -104,9 +115,90 @@ def test_full_load(destination_name: str) -> None:
with c.execute_query(sql_query) as cur:
rows = list(cur.fetchall())
loaded_ranges = [r[0] for r in rows]
assert set(loaded_ranges) == set(ALL_RANGES)
assert set(loaded_ranges) == ALL_RANGES
skipped_ranges = [r[0] for r in rows if r[1]]
assert set(skipped_ranges) == set(SKIPPED_RANGES)
assert set(skipped_ranges) == SKIPPED_RANGES


def test_get_named_ranges() -> None:
# take data from spreadsheet 1
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_named_ranges=True,
get_sheets=False,
)
assert set(data.resources.keys()) == {"NamedRange1", "spreadsheet_info"}


def test_get_sheets() -> None:
# take data from spreadsheet 1
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_named_ranges=False,
get_sheets=True,
)
assert set(data.resources.keys()) - {"spreadsheet_info"} == SHEETS - SKIPPED_RANGES


def test_get_fancy_sheets() -> None:
# take data from spreadsheet 1
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_named_ranges=False,
get_sheets=True,
)
assert set(data.resources.keys()) - {"spreadsheet_info"} == SHEETS - SKIPPED_RANGES


def test_blank_columns() -> None:
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_named_ranges=False,
get_sheets=False,
range_names=["Blank Columns"],
)
pipeline = dlt.pipeline(
destination="duckdb",
full_refresh=True,
dataset_name="test_blank_columns_data",
)
pipeline.extract(data)
pipeline.normalize()
# there were two blank columns that got automatic col_n name but contained no data so dlt eliminated them
assert set(pipeline.default_schema.get_table_columns("blank_columns").keys()) == {
"vergleich",
"anbieter",
"art",
"grundpreis_mtl",
"verbrauch",
"jahreskosten",
"netto_mtl_kosten",
"brutto_mtl_kosten",
"_dlt_id",
"_dlt_load_id",
}


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_hidden_columns_merged_cells(destination_name) -> None:
info, pipeline = _run_pipeline(
destination_name=destination_name,
dataset_name="test_hidden_columns_merged_cells",
range_names=["hidden_columns_merged_cells"],
get_named_ranges=False,
get_sheets=False,
)
assert_load_info(info)

# merged cells produce empty values but number of rows stays the same
assert load_table_counts(pipeline, "hidden_columns_merged_cells") == {
"hidden_columns_merged_cells": 7
}

# hidden columns are returned
assert "art" in pipeline.default_schema.get_table_columns(
"hidden_columns_merged_cells"
)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
Expand Down Expand Up @@ -483,6 +575,61 @@ def test_auto_header_names():
pass


def test_table_rename() -> None:
pipeline = dlt.pipeline(
destination="duckdb",
full_refresh=True,
dataset_name="test_table_rename_data",
)
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
range_names=["Sheet 1!A1:B10"],
get_named_ranges=False,
)
# apply the table name to the existing resource: the resource name is the name of the range
data.resources["Sheet 1!A1:B10"].apply_hints(table_name="my_a1_data")
info = pipeline.run(data)
assert_load_info(info)
user_tables = pipeline.default_schema.data_tables()
# check if table is there
assert set([t["name"] for t in user_tables]) == {"my_a1_data", "spreadsheet_info"}


def test_table_rename_and_multiple_spreadsheets() -> None:
pipeline = dlt.pipeline(
destination="duckdb",
full_refresh=True,
dataset_name="test_table_rename_data",
)
# take data from spreadsheet 1
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
range_names=["Sheet 1!A1:B10"],
get_named_ranges=False,
)

# take data from spreadsheet 2
data_2 = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
range_names=["Sheet 1!B1:C10"],
get_named_ranges=False,
)
# apply the table name to the existing resource: the resource name is the name of the range
data.resources["Sheet 1!A1:B10"].apply_hints(table_name="my_a1_data")
data_2.resources["Sheet 1!B1:C10"].apply_hints(table_name="second_sheet_data")

# load two spreadsheets
info = pipeline.run([data, data_2])
assert_load_info(info)
user_tables = pipeline.default_schema.data_tables()
# check if table is there
assert set([t["name"] for t in user_tables]) == {
"my_a1_data",
"second_sheet_data",
"spreadsheet_info",
}


def test_no_ranges():
# no ranges to extract
info, pipeline = _run_pipeline(
Expand Down

0 comments on commit 173e709

Please sign in to comment.