Skip to content

Commit

Permalink
Optimize harvesting flow and fix the item title miss-match issue
Browse files Browse the repository at this point in the history
  • Loading branch information
xinli-cai committed Sep 8, 2023
1 parent 5425791 commit 7b9c03f
Show file tree
Hide file tree
Showing 3 changed files with 493 additions and 416 deletions.
59 changes: 59 additions & 0 deletions Export-to-csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

import boto3
import logging
from botocore.exceptions import ClientError
import pandas as pd
import io
import os

file_name = "records.parquet"
bucket_name = "webpresence-geocore-geojson-to-parquet-dev"


# Change directory
# Get the current working directory
current_directory = os.getcwd()
print(f"Current Directory: {current_directory}")

# Change the working directory
new_directory = " " # Replace with the directory you want to switch to
os.chdir(new_directory)
current_directory = os.getcwd()
print(f"New Directory: {current_directory}")


# Function to open a S3 file from bucket and filename and return the parquet as pandas dataframe
def open_S3_file_as_df(bucket_name, file_name):
"""Open a S3 parquet file from bucket and filename and return the parquet as pandas dataframe
:param bucket_name: Bucket name
:param file_name: Specific file name to open
:return: body of the file as a string
"""
try:
s3 = boto3.resource('s3')
object = s3.Object(bucket_name, file_name)
body = object.get()['Body'].read()
df = pd.read_parquet(io.BytesIO(body))
print(f'Loading {file_name} from {bucket_name} to pandas dataframe')
return df
except ClientError as e:
logging.error(e)
return e

df = open_S3_file_as_df(bucket_name, file_name)
print(f'The shape of the raw metadata parquet dataset is {df.shape}')

"""
# Add a new column to log the process, and loop through the pandas rows to assign values
df['process_log'] = ''
## Loop through the DataFrame and update the new column based on processing condition 'Fail' or 'Success'
for index, row in df.iterrows():
if Transfromed == True:
df.at[index, 'process_log'] = 'Success' # or 1
else:
df.at[index, 'process_log'] = 'Fail' # or 0
"""
# Save all the records as a CSV to local path
save_path = os.path.join(os.getcwd(), 'records.csv')
df.to_csv(save_path)
126 changes: 78 additions & 48 deletions stac-to-geocore/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,40 +49,58 @@ def lambda_handler(event, context):

# Before harvesting the STAC api, we check the root api connectivity first
try:
response = requests.get(f'{api_root}/collections/')
print(response)
response_root = requests.get(f'{api_root}')
print(f'Connection to root api is okay with status {response_root}')
except:
error_msg = 'Connectivity issue: error trying to access: ' + api_root + '/collections'
error_msg = 'Connectivity issue: error trying to access the root api: ' + api_root

# Start the harvest and translation process if connection is okay
if response.status_code == 200:
if response_root.status_code == 200:
#Delete previous harvest included in lastRun.txt
e = delete_stac_s3(bucket_geojson=geocore_to_parquet_bucket_name, bucket_template=geocore_template_bucket_name)
if e != None:
error_msg += e
# Create a new log file and write into log file for each sucessfull harvest
print('Creating a new lastRun.txt')
with open('/tmp/lastRun.txt', 'w') as f:
#Catalog
response_root = requests.get(f'{api_root}/')
# Catalog Level
#root_data = json.loads(response_root.text)
root_data = response_root.json()
root_id = root_data['id']
root_data_json = response_root.json()
root_id = root_data_json['id']
if root_id.isspace()==False:
root_id=root_id.replace(' ', '-')
root_des = root_data['description']
root_links = root_data['links']
root_des = root_data_json['description']
root_links = root_data_json['links']
# GeoCore properties bounding box is a required for frontend, here we use the first collection
#TBD using first collection bounding box could cause potential issues when collections have different extent, a solution is required.
str_data = response.json()
collection_data = str_data['collections']
coll_bbox = collection_data[1]['extent']['spatial']['bbox'][0]
response_collection = requests.get(f'{api_root}/collections/')
collection_data_list = response_collection.json()['collections']
root_bbox = collection_data_list[1]['extent']['spatial']['bbox'][0]

# Get null geocore features body as a dictionary
geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name)
# print(f'This is the geocore_features_dict when it is loaded \n {geocore_features_dict}')
# Mapping to geocore features geometry and properties
root_geometry_dict =to_features_geometry(geocore_features_dict, bbox=coll_bbox, geometry_type='Polygon')
root_properties_dict = root_to_features_properties(geocore_features_dict,root_name, root_links, root_id, source, root_des, coll_bbox, status,maintenance, useLimits_en,useLimits_fr,spatialRepresentation,contact, type_data,topicCategory)
root_geometry_dict = to_features_geometry(geocore_features_dict, bbox=root_bbox, geometry_type='Polygon')
# Perpare for parametes required for the function
params = {
'root_name': root_name,
'root_links': root_links,
'root_id': root_id,
'root_des': root_des,
'root_bbox': root_bbox,
'source': source,
'status':status,
'maintenance':maintenance,
'useLimits_en': useLimits_en,
'useLimits_fr': useLimits_fr,
'spatialRepresentation': spatialRepresentation,
'contact': contact,
'type_data': type_data,
'topicCategory': topicCategory
}
#print(f"This is the geocore_features_dict when it is in the params before root_to_features_properties\n {params['geocore_features_dict']}")
root_properties_dict = root_to_features_properties(params, geocore_features_dict)
# Update the geocore body and finish mapping
root_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =root_properties_dict ,geometry_dict=root_geometry_dict)
# upload the stac geocore to a S3
Expand All @@ -93,24 +111,29 @@ def lambda_handler(event, context):
f.write(f"{root_upload}\n")

# Collection mapping
for coll_dict in collection_data:
coll_id, coll_bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr = get_collection_fields(coll_dict=coll_dict)

coll_features_dict = get_geocore_template(geocore_template_bucket_name, geocore_template_name)
coll_geometry_dict =to_features_geometry(geocore_features_dict=coll_features_dict, bbox=coll_bbox, geometry_type='Polygon')
coll_properties_dict = to_features_properties(geocore_features_dict=coll_features_dict, coll_dict=coll_dict, item_dict=None,stac_type='collection', root_name=root_name, root_id = root_id,source=source,
status=status,maintenance=maintenance, useLimits_en=useLimits_en,
useLimits_fr=useLimits_fr,spatialRepresentation=spatialRepresentation,contact=contact, type_data=type_data,topicCategory=topicCategory)
coll_geocore_updated = update_geocore_dict(geocore_features_dict=coll_features_dict, properties_dict =coll_properties_dict, geometry_dict=coll_geometry_dict)

coll_count = 0
for coll_dict in collection_data_list:
# Reload the null GeoCore template
geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name)
coll_extent = coll_dict.get('extent')
coll_bbox = coll_extent.get('spatial', {}).get('bbox', [None])[0]
coll_geometry_dict = to_features_geometry(geocore_features_dict, bbox=coll_bbox, geometry_type='Polygon')
coll_properties_dict = coll_to_features_properties(coll_dict=coll_dict, params=params, geocore_features_dict=geocore_features_dict)
coll_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =coll_properties_dict, geometry_dict=coll_geometry_dict)

coll_id = coll_dict.get('id')
coll_name = source + '-' + coll_id + '.geojson'
msg = upload_file_s3(coll_name, bucket=geocore_to_parquet_bucket_name, json_data=coll_geocore_updated, object_name=None)
if msg == True:
print(f'Finished mapping Collection : {coll_id}, and uploaded the file to bucket: {geocore_to_parquet_bucket_name}')
coll_count += 1
print(f'Mapping Collection {coll_count}: {coll_id}. Finished and Uploaded the collection to bucket: {geocore_to_parquet_bucket_name}')
f.write(f"{coll_name}\n")

#Item with paginate
pages = search_pages_get(url=api_root +'/search')
item_count = 0
# Get the keywords, description, and titles for all the collections as a dictionary
coll_id_dict = create_coll_dict(api_root)
for page in pages:
#Each page has 30 items
r = requests.get(page)
Expand All @@ -119,26 +142,18 @@ def lambda_handler(event, context):
for item in items_list:
#item is a dict
item_id, item_bbox, item_links, item_assets, item_properties,coll_id = get_item_fields(item)
print(f'Start to mapping item_id: {item_id}, collection_id : {coll_id}, title: {title_en}')
geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name)

item_geometry_dict = to_features_geometry(geocore_features_dict=geocore_features_dict, bbox=item_bbox, geometry_type='Polygon')

"""
Map items to collections
1) get collection id from 'collection'
2) Pre-build a dictionaru of {collection id: collection name}
3) Fix to_features_properties lin 176-line199
"""
item_properties_dict = item_to_features_properties(params=params, geocore_features_dict=geocore_features_dict, item_dict=item, coll_id_dict=coll_id_dict)

#TODO add error handling for the item mapping to geocore
item_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name)
item_geometry_dict =to_features_geometry(geocore_features_dict=item_features_dict, bbox=item_bbox, geometry_type='Polygon')
item_properties_dict = to_features_properties(geocore_features_dict=item_features_dict, coll_dict=coll_dict, item_dict=item,stac_type='item', root_name=root_name, root_id=root_id, source=source,
status=status,maintenance=maintenance, useLimits_en=useLimits_en,
useLimits_fr=useLimits_fr,spatialRepresentation=spatialRepresentation,contact=contact, type_data=type_data,topicCategory=topicCategory)
item_geocore_updated = update_geocore_dict(geocore_features_dict=item_features_dict, properties_dict =item_properties_dict ,geometry_dict=item_geometry_dict)
item_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =item_properties_dict ,geometry_dict=item_geometry_dict)
item_name = source + '-' + coll_id + '-' + item_id + '.geojson'
msg = upload_file_s3(item_name, bucket=geocore_to_parquet_bucket_name, json_data=item_geocore_updated, object_name=None)
if msg == True:
print(f'Finished mapping item : {item_id}, uploaded the file to bucket: {geocore_to_parquet_bucket_name}')
item_count += 1
print(f'Maping item {item_count}: {item_id}. Finished and uploaded the item to bucket: {geocore_to_parquet_bucket_name}')
f.write(f"{item_name}\n")
f.close()
msg = upload_file_s3(filename='lastRun.txt', bucket=geocore_template_bucket_name, json_data = None, object_name=None)
Expand All @@ -153,11 +168,26 @@ def lambda_handler(event, context):
# requires open_file_s3()
def get_geocore_template(geocore_template_bucket_name,geocore_template_name):
"""Getting GeoCore null template from S3 bucket
:param geocore_template_bucket_name: bucket name tht stores the geocore template file
:param geocore_template_name: geocore template file name
:return: geocore feature in dictionary format
Parameters:
- geocore_template_bucket_name: S3 bucket name that stores the GeoCore template.
- geocore_template_name: Name of the GeoCore template file.
Returns:
- A dictionary containing the GeoCore feature, or None if an error occurs.
"""
template= open_file_s3(geocore_template_bucket_name, geocore_template_name)
geocore_dict = json.loads(template)
geocore_features_dict = geocore_dict['features'][0]
return geocore_features_dict
try:
template= open_file_s3(geocore_template_bucket_name, geocore_template_name)
if not template:
logging.error("Template not found.")
return None
geocore_dict = json.loads(template)
return geocore_dict['features'][0]
except ClientError as e:
logging.error(f"An error occurred while accessing S3: {e}")
return None
except json.JSONDecodeError as e:
logging.error(f"An error occurred while decoding JSON: {e}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return None
Loading

0 comments on commit 7b9c03f

Please sign in to comment.