Skip to content

Commit

Permalink
Merge pull request #81 from rifqi2320/main
Browse files Browse the repository at this point in the history
Add parquet support
  • Loading branch information
ets authored Jun 6, 2024
2 parents 08e6399 + 55fd04b commit 761b448
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 2 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
'openpyxl',
'xlrd',
'paramiko',
'azure-storage-blob>=12.14.0'
'azure-storage-blob>=12.14.0',
'pyarrow>=5.0.0'
],
packages=["tap_spreadsheets_anywhere"],
include_package_data=True,
Expand Down
2 changes: 1 addition & 1 deletion tap_spreadsheets_anywhere/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Required('pattern'): str,
Required('start_date'): str,
Required('key_properties'): [str],
Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'detect'),
Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'parquet', 'detect'),
Optional('encoding'): str,
Optional('invalid_format_action'): Any('ignore','fail'),
Optional('universal_newlines'): bool,
Expand Down
8 changes: 8 additions & 0 deletions tap_spreadsheets_anywhere/format_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import tap_spreadsheets_anywhere.excel_handler
import tap_spreadsheets_anywhere.json_handler
import tap_spreadsheets_anywhere.jsonl_handler
import tap_spreadsheets_anywhere.parquet_handler

from azure.storage.blob import BlobServiceClient
import os


class InvalidFormatError(Exception):
def __init__(self, fname, message="The file was not in the expected format"):
self.name = fname
Expand Down Expand Up @@ -139,6 +142,8 @@ def get_row_iterator(table_spec, uri):
format = 'jsonl'
elif lowered_uri.endswith(".csv"):
format = 'csv'
elif lowered_uri.endswith(".parquet"):
format = 'parquet'
else:
# TODO: some protocols provide the ability to pull format (content-type) info & we could make use of that here
reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding)
Expand Down Expand Up @@ -169,6 +174,9 @@ def get_row_iterator(table_spec, uri):
# If encoding is set, smart_open will override binary mode ('b' in open_mode) and it will result in a BadZipFile error
reader = get_streamreader(uri, universal_newlines=universal_newlines,newline=None, open_mode='rb', encoding=None)
iterator = tap_spreadsheets_anywhere.excel_handler.get_row_iterator(table_spec, reader)
elif format == 'parquet':
reader = get_streamreader(uri, universal_newlines=universal_newlines, newline=None, open_mode='rb')
iterator = tap_spreadsheets_anywhere.parquet_handler.get_row_iterator(table_spec, reader)
elif format == 'json':
reader = get_streamreader(uri, universal_newlines=universal_newlines, open_mode='r', encoding=encoding)
iterator = tap_spreadsheets_anywhere.json_handler.get_row_iterator(table_spec, reader)
Expand Down
31 changes: 31 additions & 0 deletions tap_spreadsheets_anywhere/parquet_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import re
import logging
import pyarrow.parquet as pq

LOGGER = logging.getLogger(__name__)


def generator_wrapper(table, _={}) -> dict:
# change column name
def format_name(name=""):
formatted_key = re.sub(r"[^\w\s]", "", name)
# replace whitespace with underscores
formatted_key = re.sub(r"\s+", "_", formatted_key)
return formatted_key.lower()

table = table.rename_columns([format_name(name) for name in table.column_names])

for row in table.to_pylist():
yield row


def get_row_iterator(table_spec, file_handle):
try:
parquet_file = pq.ParquetFile(file_handle)
except Exception as e:
LOGGER.error("Unable to read the Parquet file: %s", e)
raise e

# Use batch to read the Parquet file
for batch in parquet_file.iter_batches():
yield from generator_wrapper(batch, table_spec)
Binary file not shown.
Binary file added tap_spreadsheets_anywhere/test/mt-sample.parquet
Binary file not shown.
131 changes: 131 additions & 0 deletions tap_spreadsheets_anywhere/test/test_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
import unittest

from tap_spreadsheets_anywhere import format_handler

LOGGER = logging.getLogger(__name__)

TEST_TABLE_SPEC = {
"tables": [
{
"path": "file://./tap_spreadsheets_anywhere/test",
"name": "parquet-iris",
"pattern": "iris\\-sample\\.parquet",
"start_date": "2017-05-01T00:00:00Z",
"key_properties": [],
"format": "parquet",
},
{
"path": "file://./tap_spreadsheets_anywhere/test",
"name": "parquet-mt",
"pattern": "mt\\-sample\\.parquet",
"start_date": "2017-05-01T00:00:00Z",
"key_properties": [],
"format": "parquet",
},
{
"path": "file://./tap_spreadsheets_anywhere/test",
"name": "parquet-iris-detect",
"pattern": "iris\\.parquet",
"start_date": "2017-05-01T00:00:00Z",
"key_properties": [],
"format": "detect",
},
{
"path": "file://./tap_spreadsheets_anywhere/test",
"name": "parquet-mt-detect",
"pattern": "mt\\.parquet",
"start_date": "2017-05-01T00:00:00Z",
"key_properties": [],
"format": "detect",
},
]
}


class TestParquet(unittest.TestCase):
def test_iris(self):
table_spec = TEST_TABLE_SPEC["tables"][0]
uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet"
iterator = format_handler.get_row_iterator(table_spec, uri)

rows = list(iterator)
self.assertEqual(len(rows), 150)
self.assertEqual(
rows[0],
{
"sepallength": 5.1,
"sepalwidth": 3.5,
"petallength": 1.4,
"petalwidth": 0.2,
"variety": "Setosa",
},
)

def test_mt(self):
table_spec = TEST_TABLE_SPEC["tables"][1]
uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet"
iterator = format_handler.get_row_iterator(table_spec, uri)

rows = list(iterator)
self.assertEqual(len(rows), 32)
self.assertEqual(
rows[0],
{
"model": "Mazda RX4",
"mpg": 21.0,
"cyl": 6,
"disp": 160.0,
"hp": 110,
"drat": 3.9,
"wt": 2.62,
"qsec": 16.46,
"vs": 0,
"am": 1,
"gear": 4,
"carb": 4,
},
)

def test_iris_detect(self):
table_spec = TEST_TABLE_SPEC["tables"][2]
uri = "./tap_spreadsheets_anywhere/test/iris-sample.parquet"
iterator = format_handler.get_row_iterator(table_spec, uri)

rows = list(iterator)
self.assertEqual(len(rows), 150)
self.assertEqual(
rows[0],
{
"sepallength": 5.1,
"sepalwidth": 3.5,
"petallength": 1.4,
"petalwidth": 0.2,
"variety": "Setosa",
},
)

def test_mt_detect(self):
table_spec = TEST_TABLE_SPEC["tables"][3]
uri = "./tap_spreadsheets_anywhere/test/mt-sample.parquet"
iterator = format_handler.get_row_iterator(table_spec, uri)

rows = list(iterator)
self.assertEqual(len(rows), 32)
self.assertEqual(
rows[0],
{
"model": "Mazda RX4",
"mpg": 21.0,
"cyl": 6,
"disp": 160.0,
"hp": 110,
"drat": 3.9,
"wt": 2.62,
"qsec": 16.46,
"vs": 0,
"am": 1,
"gear": 4,
"carb": 4,
},
)

0 comments on commit 761b448

Please sign in to comment.