diff --git a/setup.py b/setup.py index 59a1e17..f923029 100755 --- a/setup.py +++ b/setup.py @@ -18,7 +18,8 @@ 'openpyxl', 'xlrd', 'paramiko', - 'azure-storage-blob>=12.14.0' + 'azure-storage-blob>=12.14.0', + 'pyarrow>=5.0.0' ], packages=["tap_spreadsheets_anywhere"], include_package_data=True, diff --git a/tap_spreadsheets_anywhere/configuration.py b/tap_spreadsheets_anywhere/configuration.py index d131f25..df42f19 100644 --- a/tap_spreadsheets_anywhere/configuration.py +++ b/tap_spreadsheets_anywhere/configuration.py @@ -12,7 +12,7 @@ Required('pattern'): str, Required('start_date'): str, Required('key_properties'): [str], - Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'detect'), + Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'parquet', 'detect'), Optional('encoding'): str, Optional('invalid_format_action'): Any('ignore','fail'), Optional('universal_newlines'): bool, diff --git a/tap_spreadsheets_anywhere/format_handler.py b/tap_spreadsheets_anywhere/format_handler.py index 6bea333..fee0dc4 100644 --- a/tap_spreadsheets_anywhere/format_handler.py +++ b/tap_spreadsheets_anywhere/format_handler.py @@ -5,9 +5,12 @@ import tap_spreadsheets_anywhere.excel_handler import tap_spreadsheets_anywhere.json_handler import tap_spreadsheets_anywhere.jsonl_handler +import tap_spreadsheets_anywhere.parquet_handler + from azure.storage.blob import BlobServiceClient import os + class InvalidFormatError(Exception): def __init__(self, fname, message="The file was not in the expected format"): self.name = fname @@ -139,6 +142,8 @@ def get_row_iterator(table_spec, uri): format = 'jsonl' elif lowered_uri.endswith(".csv"): format = 'csv' + elif lowered_uri.endswith(".parquet"): + format = 'parquet' else: # TODO: some protocols provide the ability to pull format (content-type) info & we could make use of that here reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding) @@ -169,6 +174,9 @@ def get_row_iterator(table_spec, uri): # If encoding is set, smart_open will override binary mode ('b' in open_mode) and it will result in a BadZipFile error reader = get_streamreader(uri, universal_newlines=universal_newlines,newline=None, open_mode='rb', encoding=None) iterator = tap_spreadsheets_anywhere.excel_handler.get_row_iterator(table_spec, reader) + elif format == 'parquet': + reader = get_streamreader(uri, universal_newlines=universal_newlines, newline=None, open_mode='rb') + iterator = tap_spreadsheets_anywhere.parquet_handler.get_row_iterator(table_spec, reader) elif format == 'json': reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding) iterator = tap_spreadsheets_anywhere.json_handler.get_row_iterator(table_spec, reader) diff --git a/tap_spreadsheets_anywhere/parquet_handler.py b/tap_spreadsheets_anywhere/parquet_handler.py new file mode 100644 index 0000000..ccdb464 --- /dev/null +++ b/tap_spreadsheets_anywhere/parquet_handler.py @@ -0,0 +1,31 @@ +import re +import logging +import pyarrow.parquet as pq + +LOGGER = logging.getLogger(__name__) + + +def generator_wrapper(table, _={}) -> dict: + # change column name + def format_name(name=""): + formatted_key = re.sub(r"[^\w\s]", "", name) + # replace whitespace with underscores + formatted_key = re.sub(r"\s+", "_", formatted_key) + return formatted_key.lower() + + table = table.rename_columns([format_name(name) for name in table.column_names]) + + for row in table.to_pylist(): + yield row + + +def get_row_iterator(table_spec, file_handle): + try: + parquet_file = pq.ParquetFile(file_handle) + except Exception as e: + LOGGER.error("Unable to read the Parquet file: %s", e) + raise e + + # Use batch to read the Parquet file + for batch in parquet_file.iter_batches(): + yield from generator_wrapper(batch, table_spec) diff --git a/tap_spreadsheets_anywhere/test/iris-sample.parquet b/tap_spreadsheets_anywhere/test/iris-sample.parquet new file mode 100644 index 0000000..9224dea Binary files /dev/null and b/tap_spreadsheets_anywhere/test/iris-sample.parquet differ diff --git a/tap_spreadsheets_anywhere/test/mt-sample.parquet b/tap_spreadsheets_anywhere/test/mt-sample.parquet new file mode 100644 index 0000000..13085cd Binary files /dev/null and b/tap_spreadsheets_anywhere/test/mt-sample.parquet differ diff --git a/tap_spreadsheets_anywhere/test/test_parquet.py b/tap_spreadsheets_anywhere/test/test_parquet.py new file mode 100644 index 0000000..6e89987 --- /dev/null +++ b/tap_spreadsheets_anywhere/test/test_parquet.py @@ -0,0 +1,131 @@ +import logging +import unittest + +from tap_spreadsheets_anywhere import format_handler + +LOGGER = logging.getLogger(__name__) + +TEST_TABLE_SPEC = { + "tables": [ + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-iris", + "pattern": "iris\\-sample\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "parquet", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-mt", + "pattern": "mt\\-sample\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "parquet", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-iris-detect", + "pattern": "iris\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "detect", + }, + { + "path": "file://./tap_spreadsheets_anywhere/test", + "name": "parquet-mt-detect", + "pattern": "mt\\.parquet", + "start_date": "2017-05-01T00:00:00Z", + "key_properties": [], + "format": "detect", + }, + ] +} + + +class TestParquet(unittest.TestCase): + def test_iris(self): + table_spec = TEST_TABLE_SPEC["tables"][0] + uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 150) + self.assertEqual( + rows[0], + { + "sepallength": 5.1, + "sepalwidth": 3.5, + "petallength": 1.4, + "petalwidth": 0.2, + "variety": "Setosa", + }, + ) + + def test_mt(self): + table_spec = TEST_TABLE_SPEC["tables"][1] + uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 32) + self.assertEqual( + rows[0], + { + "model": "Mazda RX4", + "mpg": 21.0, + "cyl": 6, + "disp": 160.0, + "hp": 110, + "drat": 3.9, + "wt": 2.62, + "qsec": 16.46, + "vs": 0, + "am": 1, + "gear": 4, + "carb": 4, + }, + ) + + def test_iris_detect(self): + table_spec = TEST_TABLE_SPEC["tables"][2] + uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 150) + self.assertEqual( + rows[0], + { + "sepallength": 5.1, + "sepalwidth": 3.5, + "petallength": 1.4, + "petalwidth": 0.2, + "variety": "Setosa", + }, + ) + + def test_mt_detect(self): + table_spec = TEST_TABLE_SPEC["tables"][3] + uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet" + iterator = format_handler.get_row_iterator(table_spec, uri) + + rows = list(iterator) + self.assertEqual(len(rows), 32) + self.assertEqual( + rows[0], + { + "model": "Mazda RX4", + "mpg": 21.0, + "cyl": 6, + "disp": 160.0, + "hp": 110, + "drat": 3.9, + "wt": 2.62, + "qsec": 16.46, + "vs": 0, + "am": 1, + "gear": 4, + "carb": 4, + }, + )