Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Save cleaned data to tsv to make upstream clean up easier #1126

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 100 additions & 11 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

This includes cleaning up malformed URLs and filtering out undesirable tags.
"""

import csv
import logging as log
import multiprocessing
import re as regex
import time
import uuid
from urllib.parse import urlparse
Expand Down Expand Up @@ -72,6 +73,18 @@ class CleanupFunctions:
Cleanup functions are dispatched in the _cleanup_config dictionary.
"""

@staticmethod
def cleanup_wiki_title(title):
"""
Remove the "File:" prefix and the image filetype suffix from the title
if it exists. If no change is made, return None.
"""
pat = regex.compile(r"File:?(.*?)(?:\.(jpg|jpeg|png|gif|bmp|svg))?$")
if match := pat.match(title):
clean_title = match.group(1).replace("'", "''")
return f"'{clean_title}'"
return None

@staticmethod
def cleanup_url(url, tls_support):
"""
Expand Down Expand Up @@ -147,13 +160,29 @@ def cleanup_tags(tags):
"creator_url": CleanupFunctions.cleanup_url,
"foreign_landing_url": CleanupFunctions.cleanup_url,
}
}
},
"wikimedia": {
"fields": {
"title": CleanupFunctions.cleanup_wiki_title,
}
},
}
}
}
}


def _get_cleanable_fields(table):
"""
Extract global and sources-specific field names from
_cleanup_config for specific table.
"""
cleanable_fields = []
for source in _cleanup_config["tables"][table]["sources"].values():
cleanable_fields += list(source["fields"].keys())
return cleanable_fields


class TlsTest:
"""
Test URLs to add the correct protocol when missing and use HTTPS when available.
Expand Down Expand Up @@ -184,19 +213,31 @@ def test_tls_supported(cls, url):
return True


def _clean_data_worker(rows, temp_table, sources_config):
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
log.info("Starting data cleaning worker")
global_field_to_func = sources_config["*"]["fields"]
worker_conn = database_connect()
log.info("Data cleaning worker connected to database")
write_cur = worker_conn.cursor(cursor_factory=DictCursor)
log.info(f"Cleaning {len(rows)} rows")
tls_cache = {}
# We know that flickr and wikimedia support TLS, so we can add them here
tls_cache = {
"www.flickr.com": True,
"commons.wikimedia.org": True,
"https://www.eol.org/": True,
".geograph.org.uk": True,
".eol.org": True,
".digitaltmuseum.org": True,
"www.geograph.org.uk": True,
}

start_time = time.time()
cleaned_values = {field: [] for field in all_fields}
for row in rows:
# Map fields that need updating to their cleaning functions
source = row["source"]
_id = row["id"]
identifier = row["identifier"]
if source in sources_config:
source_field_to_func = sources_config[source]["fields"]
# Merge source-local and global function field mappings
Expand All @@ -220,6 +261,8 @@ def _clean_data_worker(rows, temp_table, sources_config):
update_field_expressions = []
for field in cleaned_data:
update_field_expressions.append(f"{field} = {cleaned_data[field]}")
cleaned_values[field].append((identifier, cleaned_data[field]))

if len(update_field_expressions) > 0:
update_query = f"""UPDATE {temp_table} SET
{', '.join(update_field_expressions)} WHERE id = {_id}
Expand All @@ -233,15 +276,42 @@ def _clean_data_worker(rows, temp_table, sources_config):
end_time = time.time()
total_time = end_time - start_time
log.info(f"Worker finished batch in {total_time}")
return True
return cleaned_values


def save_cleaned_data(results):
log.info("Saving cleaned data...")
start_time = time.time()

results_to_save: dict[str, list[tuple[str, str | Json]]] = {}
# Results is a list of dicts, where each dict is a mapping of field name to
# a list of tuples of (identifier, cleaned_value). There are as many dicts
# as there are workers. We need to merge the lists of tuples for each field
# name.
for result in results:
for field in result:
if field not in results_to_save:
results_to_save[field] = []
results_to_save[field].extend(result[field])
cleanup_counts = {}
for field, cleaned_items in results_to_save.items():
cleanup_counts[field] = len(cleaned_items) if cleaned_items else 0
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.time()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time}")
return cleanup_counts


def clean_image_data(table):
"""
Clean up data loaded from upstream that is unsuitable for prod before going live.

:param table: The staging table for the new data
:param upstream_db: A dict specifying the connection details of the upstream DB
:return: None
"""

Expand All @@ -262,7 +332,8 @@ def clean_image_data(table):
fields_to_clean.add(f)

cleanup_selection = (
f"SELECT id, source, " f"{', '.join(fields_to_clean)} from temp_import_{table}"
f"SELECT id, identifier, source, "
f"{', '.join(fields_to_clean)} from temp_import_{table}"
)
log.info(f'Running cleanup on selection "{cleanup_selection}"')
conn = database_connect(autocommit=True)
Expand All @@ -281,6 +352,8 @@ def clean_image_data(table):
jobs = []
num_workers = multiprocessing.cpu_count()
num_cleaned = 0
cleaned_counts_by_field = {field: 0 for field in fields_to_clean}

while batch:
# Divide updates into jobs for parallel execution.
batch_start_time = time.time()
Expand All @@ -294,22 +367,38 @@ def clean_image_data(table):
end = job_size * n
last_end = end
# Arguments for parallel _clean_data_worker calls
jobs.append((batch[start:end], temp_table, source_config))
jobs.append(
(
batch[start:end],
temp_table,
source_config,
_get_cleanable_fields("image"),
)
)
pool = multiprocessing.Pool(processes=num_workers)
log.info(f"Starting {len(jobs)} cleaning jobs")
conn.commit()
pool.starmap(_clean_data_worker, jobs)
results = pool.starmap(_clean_data_worker, jobs)
batch_cleaned_counts = save_cleaned_data(results)
for field in batch_cleaned_counts:
cleaned_counts_by_field[field] += batch_cleaned_counts[field]
pool.close()
num_cleaned += len(batch)
batch_end_time = time.time()
rate = len(batch) / (batch_end_time - batch_start_time)
log.info(f"Batch finished, records/s: cleanup_rate={rate}")
log.info(f"Fetching next batch. Records cleaned so far: {num_cleaned}")
log.info(
f"Fetching next batch. Records cleaned so far: {num_cleaned},"
f"counts: {batch_cleaned_counts}"
)
jobs = []
batch = iter_cur.fetchmany(size=CLEANUP_BUFFER_SIZE)
conn.commit()
iter_cur.close()
conn.close()
end_time = time.time()
cleanup_time = end_time - start_time
log.info(f"Cleaned all records in {cleanup_time} seconds")
log.info(
f"Cleaned all records in {cleanup_time} seconds,"
f"counts: {cleaned_counts_by_field}"
)
Loading