Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-20219: Add support for JSONL files #40

Open
wants to merge 20 commits into
base: crest-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
name: 'Unit Tests'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do update the singer-encoding version.

command: |
source /usr/local/share/virtualenvs/tap-sftp/bin/activate
pip install nose coverage
pip install nose coverage parameterized
nosetests --with-coverage --cover-erase --cover-package=tap_sftp --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
Expand Down
5 changes: 4 additions & 1 deletion tap_sftp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def get_files_by_prefix(self, prefix):

return files

def get_files(self, prefix, search_pattern, modified_since=None):
def get_files(self, table_spec, modified_since=None):
prefix = table_spec['search_prefix']
search_pattern = table_spec['search_pattern']

files = self.get_files_by_prefix(prefix)
if files:
LOGGER.info('Found %s files in "%s"', len(files), prefix)
Expand Down
179 changes: 174 additions & 5 deletions tap_sftp/discover.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,188 @@
import gzip
import itertools
import json
import socket
import zipfile
import backoff
import singer

from singer_encodings import json_schema
import csv as python_csv
from singer_encodings import json_schema, csv, compression, jsonl, schema as se_schema
from singer import metadata
from tap_sftp import client

LOGGER= singer.get_logger()
LOGGER = singer.get_logger()

def compression_infer_local(iterable, file_name):
"""Uses the incoming file_name and checks the end of the string for supported compression types"""
if not file_name:
raise Exception("Need file name")

if file_name.endswith('.tar.gz'):
raise NotImplementedError("tar.gz not supported")
elif file_name.endswith('.gz'):
yield gzip.GzipFile(fileobj=iterable)
elif file_name.endswith('.zip'):
with zipfile.ZipFile(iterable) as zip:
for name in zip.namelist():
yield zip.open(name)
else:
yield iterable

def get_row_iterators_local(iterable, options={}, infer_compression=False, headers_in_catalog=None, with_duplicate_headers=False):
"""
Accepts an iterable, options, compression flag, catalog headers and flag for duplicate headers
to infer compression and yields csv.DictReader objects can be used to yield CSV rows.
"""
if infer_compression:
compressed_iterables = compression_infer_local(iterable, options.get('file_name'))

for item in compressed_iterables:

# Try to parse as JSONL
try:
# Duplicate 'item' iterator as 'get_JSONL_iterators' will use 1st row of 'item' to load as JSONL
# Thus, the 'item' will have records after the 1st row on encountering the 'JSONDecodeError' error
# As a result 'csv.get_row_iterator' will sync records after the 1st row
item, item_for_JSONL = itertools.tee(item)
yield ('jsonl', jsonl.get_JSONL_iterators(item_for_JSONL, options))
continue
except json.JSONDecodeError:
pass

# Maximize the CSV field width
csv.maximize_csv_field_width()

# Finally parse as CSV
yield ('csv', csv.get_row_iterator(item, options=options))


# pylint: disable=too-many-arguments
def sample_files_local(conn, table_spec, files, sample_rate=1, max_records=1000, max_files=5):
"""Function to sample matched files as per the sampling rate and the max records to sample"""
LOGGER.info("Sampling files (max files: %s)", max_files)
to_return = []
empty_samples = []

files_so_far = 0

sorted_files = sorted(
files, key=lambda f: f['last_modified'], reverse=True)

for f in sorted_files:
empty_file, samples = sample_file_local(conn, table_spec, f, sample_rate, max_records)

if empty_file:
empty_samples += samples
else:
to_return += samples

files_so_far += 1

if files_so_far >= max_files:
break

if len(to_return) == 0:
return empty_samples

return to_return


def sample_file_local(conn, table_spec, f, sample_rate, max_records):
"""Function to sample a file and return list of records for that file"""

LOGGER.info('Sampling %s (max records: %s, sample rate: %s)',
f['filepath'],
max_records,
sample_rate)

samples = []
file_name = f['filepath']

try:
file_handle = conn.get_file_handle(f)
except OSError:
return (False, samples)

# Add file_name to opts and flag infer_compression to support gzipped files
opts = {'key_properties': table_spec['key_properties'],
'delimiter': table_spec.get('delimiter', ','),
'file_name': file_name}

readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True)

for _, reader in readers:
current_row = 0
for row in reader:
if (current_row % sample_rate) == 0:
if row.get(csv.SDC_EXTRA_COLUMN):
row.pop(csv.SDC_EXTRA_COLUMN)
samples.append(row)

current_row += 1

if len(samples) >= max_records:
break

LOGGER.info("Sampled %s rows from %s", len(samples), file_name)
# Empty sample to show field selection, if needed
empty_file = False
if len(samples) == 0:
empty_file = True
# If the 'reader' is an instance of 'csv.Dictreader' and has
# fieldnames, prepare samples with 'None' field names
# Assumes all reader objects in readers have the same fieldnames
if isinstance(reader, python_csv.DictReader) and reader.fieldnames is not None:
samples.append({name: None for name in reader.fieldnames})

return (empty_file, samples)

def get_schema_for_table_local(conn, table_spec, sample_rate=1):
"""Function to generate schema for the provided data files"""
files = conn.get_files(table_spec)

if not files:
return {}

samples = json_schema.sample_files(conn, table_spec, files, sample_rate=sample_rate)

# Return empty if there is no schema generated
if not any(samples):
return {
'type': 'object',
'properties': {},
}

schema = se_schema.generate_schema(samples, table_spec)

data_schema = {
**schema,
**json_schema.get_sdc_columns()
}

return {
'type': 'object',
'properties': data_schema,
}


# Override singer_encoding's 'get_row_iterators' as:
# - The Tap is supporting files created without an extension
csv.get_row_iterators = get_row_iterators_local

# Override singer_encoding's 'sample_file' as:
# - The Tap is looping over the files in the sorted manner of 'last_modified'
# - The Tap is not supporting the skipping of CSV and JSONL files with the wrong extension
json_schema.sample_files = sample_files_local

# Override singer_encoding's 'sample_file' as:
# - The Tap is not having support for CSV files with duplicate headers
# - The Tap is creating a sample record with 'None' for CSV files with only headers
json_schema.sample_file = sample_file_local

def discover_streams(config):
streams = []

conn = client.connection(config)
prefix = format(config.get("user_dir", "./"))

tables = json.loads(config['tables'])
for table_spec in tables:
Expand All @@ -40,7 +209,7 @@ def discover_streams(config):
# generate schema
def get_schema(conn, table_spec):
LOGGER.info('Sampling records to determine table JSON schema "%s".', table_spec['table_name'])
schema = json_schema.get_schema_for_table(conn, table_spec)
schema = get_schema_for_table_local(conn, table_spec)
stream_md = metadata.get_standard_metadata(schema,
key_properties=table_spec.get('key_properties'),
replication_method='INCREMENTAL')
Expand Down
39 changes: 31 additions & 8 deletions tap_sftp/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def sync_stream(config, state, stream):
return 0
table_spec = table_spec[0]

files = conn.get_files(table_spec["search_prefix"],
table_spec["search_pattern"],
modified_since)
files = conn.get_files(table_spec, modified_since)

LOGGER.info('Found %s files to be synced.', len(files))

Expand Down Expand Up @@ -67,19 +65,44 @@ def sync_file(conn, f, stream, table_spec):
readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True)

records_synced = 0
tap_added_fields = ['_sdc_source_file', '_sdc_source_lineno', 'sdc_extra']
schema_dict = stream.schema.to_dict()

for reader in readers:
for file_extension, reader in readers:
with Transformer() as transformer:
# Row start for files as per the file type
row_start_line = 1 if file_extension == 'jsonl' else 2
for row in reader:
# Skipping the empty line
if len(row) == 0:
continue

custom_columns = {
'_sdc_source_file': f["filepath"],

# index zero, +1 for header row
'_sdc_source_lineno': records_synced + 2
'_sdc_source_lineno': records_synced + row_start_line
}

# For CSV files, the '_sdc_extra' is handled by 'restkey' in 'csv.DictReader'
# If the file is JSONL then prepare '_sdc_extra' column
if file_extension == 'jsonl':
sdc_extra = []

# Get the extra fields ie. (json keys - fields from the catalog - fields added by the tap)
extra_fields = set()
# Create '_sdc_extra' fields if the schema is not empty
if schema_dict.get('properties'):
extra_fields = set(row.keys()) - set(schema_dict.get('properties', {}).keys() - tap_added_fields)

# Prepare list of extra fields
for extra_field in extra_fields:
sdc_extra.append({extra_field: row.get(extra_field)})
# If the record contains extra fields, then add the '_sdc_extra' column
if extra_fields:
custom_columns['_sdc_extra'] = sdc_extra

rec = {**row, **custom_columns}

to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata))
to_write = transformer.transform(rec, schema_dict, metadata.to_map(stream.metadata))

singer.write_record(stream.tap_stream_id, to_write)
records_synced += 1
Expand Down
26 changes: 26 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def get_test_connection(self):
def random_string_generator(self, size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for x in range(size))

def generate_max_size_csv(self):
"""Generate field with max size"""
return [[1, 'a'*131074]]

def generate_simple_csv_lines_typeA(self, num_lines):
lines = []
for int_value in range(num_lines):
Expand All @@ -54,6 +58,28 @@ def generate_simple_csv_lines_typeC(self, num_lines):
lines.append([int_value, self.random_string_generator(), int_value*5, utils.strftime(start_datetime), int_value + random.random()])
return lines

def generate_simple_jsonl_lines_typeA(self, num_lines):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add code comment for each function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment for data generation functions.

lines = []
for int_value in range(num_lines):
lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5})
return lines

def generate_simple_jsonl_lines_typeB(self, num_lines):
lines = []
start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc)
for int_value in range(num_lines):
start_datetime = start_datetime + timedelta(days=5)
lines.append({"id": int_value, "string_col": self.random_string_generator(), "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()})
return lines

def generate_simple_jsonl_lines_typeC(self, num_lines):
lines = []
start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc)
for int_value in range(num_lines):
start_datetime = start_datetime + timedelta(days=5)
lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5, "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()})
return lines

def isdir(path, client):
try:
return S_ISDIR(client.stat(path).st_mode)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_sftp_empty_csv_in_gz.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ def setUp(self):
headers = file_group['headers']
directory = file_group['directory']
for filename in file_group['files']:
file_to_gzip = ".".join(filename.split(".")[:-1])
client.chdir(directory)
with client.open(filename, 'w') as direct_file:
with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file:
with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file:
# write in file if 'num_rows', used to create an empty 'csv.gz' file
if file_group.get('num_rows'):
with io.TextIOWrapper(gzip_file, encoding='utf-8') as f:
Expand Down
3 changes: 2 additions & 1 deletion tests/test_sftp_gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ def setUp(self):
headers = file_group['headers']
directory = file_group['directory']
for filename in file_group['files']:
file_to_gzip = ".".join(filename.split(".")[:-1])
client.chdir(directory)
with client.open(filename, 'w') as direct_file:
with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file:
with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file:
with io.TextIOWrapper(gzip_file, encoding='utf-8') as f:
writer = csv.writer(f)
lines = [headers] + file_group['generator'](file_group['num_rows'])
Expand Down
Loading