From 7b9c03fcd5d455bb103c18ed56c5adbe6849f794 Mon Sep 17 00:00:00 2001 From: Xinli Cai Date: Thu, 7 Sep 2023 22:36:07 -0400 Subject: [PATCH] Optimize harvesting flow and fix the item title miss-match issue --- Export-to-csv.py | 59 +++ stac-to-geocore/app.py | 126 +++-- stac-to-geocore/stac_to_geocore.py | 724 ++++++++++++++--------------- 3 files changed, 493 insertions(+), 416 deletions(-) create mode 100644 Export-to-csv.py diff --git a/Export-to-csv.py b/Export-to-csv.py new file mode 100644 index 0000000..25b186f --- /dev/null +++ b/Export-to-csv.py @@ -0,0 +1,59 @@ + +import boto3 +import logging +from botocore.exceptions import ClientError +import pandas as pd +import io +import os + +file_name = "records.parquet" +bucket_name = "webpresence-geocore-geojson-to-parquet-dev" + + +# Change directory +# Get the current working directory +current_directory = os.getcwd() +print(f"Current Directory: {current_directory}") + +# Change the working directory +new_directory = " " # Replace with the directory you want to switch to +os.chdir(new_directory) +current_directory = os.getcwd() +print(f"New Directory: {current_directory}") + + +# Function to open a S3 file from bucket and filename and return the parquet as pandas dataframe +def open_S3_file_as_df(bucket_name, file_name): + """Open a S3 parquet file from bucket and filename and return the parquet as pandas dataframe + :param bucket_name: Bucket name + :param file_name: Specific file name to open + :return: body of the file as a string + """ + try: + s3 = boto3.resource('s3') + object = s3.Object(bucket_name, file_name) + body = object.get()['Body'].read() + df = pd.read_parquet(io.BytesIO(body)) + print(f'Loading {file_name} from {bucket_name} to pandas dataframe') + return df + except ClientError as e: + logging.error(e) + return e + +df = open_S3_file_as_df(bucket_name, file_name) +print(f'The shape of the raw metadata parquet dataset is {df.shape}') + +""" +# Add a new column to log the process, and loop through the pandas rows to assign values +df['process_log'] = '' + +## Loop through the DataFrame and update the new column based on processing condition 'Fail' or 'Success' +for index, row in df.iterrows(): + if Transfromed == True: + df.at[index, 'process_log'] = 'Success' # or 1 + else: + df.at[index, 'process_log'] = 'Fail' # or 0 +""" +# Save all the records as a CSV to local path +save_path = os.path.join(os.getcwd(), 'records.csv') +df.to_csv(save_path) diff --git a/stac-to-geocore/app.py b/stac-to-geocore/app.py index 0cf9560..8a31a4b 100644 --- a/stac-to-geocore/app.py +++ b/stac-to-geocore/app.py @@ -49,13 +49,13 @@ def lambda_handler(event, context): # Before harvesting the STAC api, we check the root api connectivity first try: - response = requests.get(f'{api_root}/collections/') - print(response) + response_root = requests.get(f'{api_root}') + print(f'Connection to root api is okay with status {response_root}') except: - error_msg = 'Connectivity issue: error trying to access: ' + api_root + '/collections' + error_msg = 'Connectivity issue: error trying to access the root api: ' + api_root # Start the harvest and translation process if connection is okay - if response.status_code == 200: + if response_root.status_code == 200: #Delete previous harvest included in lastRun.txt e = delete_stac_s3(bucket_geojson=geocore_to_parquet_bucket_name, bucket_template=geocore_template_bucket_name) if e != None: @@ -63,26 +63,44 @@ def lambda_handler(event, context): # Create a new log file and write into log file for each sucessfull harvest print('Creating a new lastRun.txt') with open('/tmp/lastRun.txt', 'w') as f: - #Catalog - response_root = requests.get(f'{api_root}/') + # Catalog Level #root_data = json.loads(response_root.text) - root_data = response_root.json() - root_id = root_data['id'] + root_data_json = response_root.json() + root_id = root_data_json['id'] if root_id.isspace()==False: root_id=root_id.replace(' ', '-') - root_des = root_data['description'] - root_links = root_data['links'] + root_des = root_data_json['description'] + root_links = root_data_json['links'] # GeoCore properties bounding box is a required for frontend, here we use the first collection #TBD using first collection bounding box could cause potential issues when collections have different extent, a solution is required. - str_data = response.json() - collection_data = str_data['collections'] - coll_bbox = collection_data[1]['extent']['spatial']['bbox'][0] + response_collection = requests.get(f'{api_root}/collections/') + collection_data_list = response_collection.json()['collections'] + root_bbox = collection_data_list[1]['extent']['spatial']['bbox'][0] # Get null geocore features body as a dictionary geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name) + # print(f'This is the geocore_features_dict when it is loaded \n {geocore_features_dict}') # Mapping to geocore features geometry and properties - root_geometry_dict =to_features_geometry(geocore_features_dict, bbox=coll_bbox, geometry_type='Polygon') - root_properties_dict = root_to_features_properties(geocore_features_dict,root_name, root_links, root_id, source, root_des, coll_bbox, status,maintenance, useLimits_en,useLimits_fr,spatialRepresentation,contact, type_data,topicCategory) + root_geometry_dict = to_features_geometry(geocore_features_dict, bbox=root_bbox, geometry_type='Polygon') + # Perpare for parametes required for the function + params = { + 'root_name': root_name, + 'root_links': root_links, + 'root_id': root_id, + 'root_des': root_des, + 'root_bbox': root_bbox, + 'source': source, + 'status':status, + 'maintenance':maintenance, + 'useLimits_en': useLimits_en, + 'useLimits_fr': useLimits_fr, + 'spatialRepresentation': spatialRepresentation, + 'contact': contact, + 'type_data': type_data, + 'topicCategory': topicCategory + } + #print(f"This is the geocore_features_dict when it is in the params before root_to_features_properties\n {params['geocore_features_dict']}") + root_properties_dict = root_to_features_properties(params, geocore_features_dict) # Update the geocore body and finish mapping root_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =root_properties_dict ,geometry_dict=root_geometry_dict) # upload the stac geocore to a S3 @@ -93,24 +111,29 @@ def lambda_handler(event, context): f.write(f"{root_upload}\n") # Collection mapping - for coll_dict in collection_data: - coll_id, coll_bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr = get_collection_fields(coll_dict=coll_dict) - - coll_features_dict = get_geocore_template(geocore_template_bucket_name, geocore_template_name) - coll_geometry_dict =to_features_geometry(geocore_features_dict=coll_features_dict, bbox=coll_bbox, geometry_type='Polygon') - coll_properties_dict = to_features_properties(geocore_features_dict=coll_features_dict, coll_dict=coll_dict, item_dict=None,stac_type='collection', root_name=root_name, root_id = root_id,source=source, - status=status,maintenance=maintenance, useLimits_en=useLimits_en, - useLimits_fr=useLimits_fr,spatialRepresentation=spatialRepresentation,contact=contact, type_data=type_data,topicCategory=topicCategory) - coll_geocore_updated = update_geocore_dict(geocore_features_dict=coll_features_dict, properties_dict =coll_properties_dict, geometry_dict=coll_geometry_dict) - + coll_count = 0 + for coll_dict in collection_data_list: + # Reload the null GeoCore template + geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name) + coll_extent = coll_dict.get('extent') + coll_bbox = coll_extent.get('spatial', {}).get('bbox', [None])[0] + coll_geometry_dict = to_features_geometry(geocore_features_dict, bbox=coll_bbox, geometry_type='Polygon') + coll_properties_dict = coll_to_features_properties(coll_dict=coll_dict, params=params, geocore_features_dict=geocore_features_dict) + coll_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =coll_properties_dict, geometry_dict=coll_geometry_dict) + + coll_id = coll_dict.get('id') coll_name = source + '-' + coll_id + '.geojson' msg = upload_file_s3(coll_name, bucket=geocore_to_parquet_bucket_name, json_data=coll_geocore_updated, object_name=None) if msg == True: - print(f'Finished mapping Collection : {coll_id}, and uploaded the file to bucket: {geocore_to_parquet_bucket_name}') + coll_count += 1 + print(f'Mapping Collection {coll_count}: {coll_id}. Finished and Uploaded the collection to bucket: {geocore_to_parquet_bucket_name}') f.write(f"{coll_name}\n") #Item with paginate pages = search_pages_get(url=api_root +'/search') + item_count = 0 + # Get the keywords, description, and titles for all the collections as a dictionary + coll_id_dict = create_coll_dict(api_root) for page in pages: #Each page has 30 items r = requests.get(page) @@ -119,26 +142,18 @@ def lambda_handler(event, context): for item in items_list: #item is a dict item_id, item_bbox, item_links, item_assets, item_properties,coll_id = get_item_fields(item) - print(f'Start to mapping item_id: {item_id}, collection_id : {coll_id}, title: {title_en}') + geocore_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name) + + item_geometry_dict = to_features_geometry(geocore_features_dict=geocore_features_dict, bbox=item_bbox, geometry_type='Polygon') - """ - Map items to collections - 1) get collection id from 'collection' - 2) Pre-build a dictionaru of {collection id: collection name} - 3) Fix to_features_properties lin 176-line199 - """ + item_properties_dict = item_to_features_properties(params=params, geocore_features_dict=geocore_features_dict, item_dict=item, coll_id_dict=coll_id_dict) - #TODO add error handling for the item mapping to geocore - item_features_dict = get_geocore_template(geocore_template_bucket_name,geocore_template_name) - item_geometry_dict =to_features_geometry(geocore_features_dict=item_features_dict, bbox=item_bbox, geometry_type='Polygon') - item_properties_dict = to_features_properties(geocore_features_dict=item_features_dict, coll_dict=coll_dict, item_dict=item,stac_type='item', root_name=root_name, root_id=root_id, source=source, - status=status,maintenance=maintenance, useLimits_en=useLimits_en, - useLimits_fr=useLimits_fr,spatialRepresentation=spatialRepresentation,contact=contact, type_data=type_data,topicCategory=topicCategory) - item_geocore_updated = update_geocore_dict(geocore_features_dict=item_features_dict, properties_dict =item_properties_dict ,geometry_dict=item_geometry_dict) + item_geocore_updated = update_geocore_dict(geocore_features_dict=geocore_features_dict, properties_dict =item_properties_dict ,geometry_dict=item_geometry_dict) item_name = source + '-' + coll_id + '-' + item_id + '.geojson' msg = upload_file_s3(item_name, bucket=geocore_to_parquet_bucket_name, json_data=item_geocore_updated, object_name=None) if msg == True: - print(f'Finished mapping item : {item_id}, uploaded the file to bucket: {geocore_to_parquet_bucket_name}') + item_count += 1 + print(f'Maping item {item_count}: {item_id}. Finished and uploaded the item to bucket: {geocore_to_parquet_bucket_name}') f.write(f"{item_name}\n") f.close() msg = upload_file_s3(filename='lastRun.txt', bucket=geocore_template_bucket_name, json_data = None, object_name=None) @@ -153,11 +168,26 @@ def lambda_handler(event, context): # requires open_file_s3() def get_geocore_template(geocore_template_bucket_name,geocore_template_name): """Getting GeoCore null template from S3 bucket - :param geocore_template_bucket_name: bucket name tht stores the geocore template file - :param geocore_template_name: geocore template file name - :return: geocore feature in dictionary format + Parameters: + - geocore_template_bucket_name: S3 bucket name that stores the GeoCore template. + - geocore_template_name: Name of the GeoCore template file. + + Returns: + - A dictionary containing the GeoCore feature, or None if an error occurs. """ - template= open_file_s3(geocore_template_bucket_name, geocore_template_name) - geocore_dict = json.loads(template) - geocore_features_dict = geocore_dict['features'][0] - return geocore_features_dict \ No newline at end of file + try: + template= open_file_s3(geocore_template_bucket_name, geocore_template_name) + if not template: + logging.error("Template not found.") + return None + geocore_dict = json.loads(template) + return geocore_dict['features'][0] + except ClientError as e: + logging.error(f"An error occurred while accessing S3: {e}") + return None + except json.JSONDecodeError as e: + logging.error(f"An error occurred while decoding JSON: {e}") + return None + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + return None \ No newline at end of file diff --git a/stac-to-geocore/stac_to_geocore.py b/stac-to-geocore/stac_to_geocore.py index bfe3777..9fb812c 100644 --- a/stac-to-geocore/stac_to_geocore.py +++ b/stac-to-geocore/stac_to_geocore.py @@ -1,6 +1,7 @@ import json from datetime import datetime import re +import requests # Hardcoded variables for the STAC to GeoCore translation status = 'unknown' @@ -55,20 +56,57 @@ }] # STAC to GeoCore translation functions +def update_dict(target_dict, updates): + """Utility function to update a dictionary with new key-value pairs. + + Parameters: + - target_dict: The original dictionary to update. + - updates: A dictionary containing the updates. + + Returns: + - The updated dictionary. + """ + target_dict.update(updates) + return target_dict + +def update_geocore_dict(geocore_features_dict, properties_dict, geometry_dict): + """Update the GeoCore geocore_features_dict null template with the updated properties and geometry dictionaries. + + Parameters: + - geocore_features_dict: The initial GeoCore dictionary. + - properties_dict: The updated properties dictionary. + - geometry_dict: The updated geometry dictionary. + + Returns: + - A new GeoCore dictionary with updated features. + """ + if not isinstance(properties_dict, dict) or not isinstance(geometry_dict, dict): + raise ValueError("properties_dict and geometry_dict must be dictionaries.") + + updated_dict = geocore_features_dict.copy() + updated_dict = update_dict(updated_dict, {"properties": properties_dict, "geometry": geometry_dict}) + return { + "type": "FeatureCollection", + "features": [updated_dict] + } + #stac_to_feature_geometry -def to_features_geometry(geocore_features_dict, bbox, geometry_type='Polygon'): - """Mapping to GeoCore features geometry field +def to_features_geometry(geocore_features_dict, bbox, geometry_type='Polygon'): + """Mapping to GeoCore features geometry field. + :param bbox: list of bounding box [west, south, east, north] - :param geometry_type: string of item or collection type, always be Polygon + :param geometry_type: string of item or collection type, default is 'Polygon' """ geometry_dict = geocore_features_dict['geometry'] - geometry_dict.update({"type":geometry_type}) - west = round(bbox[0], 2) - south = round(bbox[1],2) - east = round(bbox[2],2) - north = round(bbox[3],2) + west, south, east, north = [round(coord, 2) for coord in bbox] coordinates=[[[west, south], [east, south], [east, north], [west, north], [west, south]]] - geometry_dict.update({"coordinates":coordinates}) + # Update the geometry dictionary + updates = { + "type": geometry_type, + "coordinates": coordinates + } + update_dict(geometry_dict, updates) + return geometry_dict # A function to map STAC links to GeoCore option @@ -83,52 +121,24 @@ def links_to_properties_options(links_list, id, root_name, title_en, title_fr, s return_list = [] root_name_en,root_name_fr = root_name.split('/') for var in links_list: - href = var.get('href') - rel = var.get('rel') - type_str = var.get('type') - name=var.get('title') - if type_str: - type_str=type_str.replace(';', ',') # for proper display on metadata page - # rel type: https://github.com/radiantearth/stac-spec/blob/master/item-spec/item-spec.md#relation-types - if rel == 'collection' or rel == 'dedrived_from': - continue - if rel == 'self'and stac_type != 'root': - name_en = 'Self - ' + id - name_fr = 'Soi - ' + id - elif rel == 'self' and stac_type == 'root': - name_en = 'Root - ' + root_name_en - name_fr = 'Racine - ' + root_name_fr - elif rel == 'root': - name_en = 'Root - ' + root_name_en - name_fr = 'Racine - ' + root_name_fr - elif stac_type == 'item' and rel == 'parent' and title_en!=None and title_fr!=None: - name_en = 'Parent - ' + title_en - name_fr = 'Parente - ' + title_fr - elif stac_type == 'collection' and rel == 'parent': - name_en = 'Parent links ' - name_fr = 'Parente liens' - elif rel == 'items': # this rel type is only for collection - name_en = 'Items listings' - name_fr = 'Èléments la liste' - elif name: - name_en = name - name_fr = name - else: - name_en = 'Unknown' - name_fr = 'Inconnue' - option_dic = { - "url": href, - "protocol": 'Unknown', - "name":{ - "en":name_en, - "fr":name_fr - }, - "description":{ - "en":'unknown' + ';' + type_str + ';' +'eng', - "fr":'unknown' + ';' + type_str + ';' +'fra' - } - } - return_list.append(option_dic) + href, rel, type_str, name = var.get('href'), var.get('rel'), var.get('type', '').replace(';', ','), var.get('title') + name_en, name_fr = { + 'collection': (None, None), + 'derived_from': (None, None), + 'self': ('Self - ' + id if stac_type != 'root' else 'Root - ' + root_name_en, 'Soi - ' + id if stac_type != 'root' else 'Racine - ' + root_name_fr), + 'root': ('Root - ' + root_name_en, 'Racine - ' + root_name_fr), + 'parent': ('Parent - ' + title_en if stac_type == 'item' and title_en else 'Parent links', 'Parente - ' + title_fr if stac_type == 'item' and title_fr else 'Parente liens'), + 'items': ('Items listings', 'Éléments la liste') + }.get(rel, (name if name else 'Unknown', name if name else 'Inconnue')) + + if name_en and name_fr: + option_dic = { + "url": href, + "protocol": 'Unknown', + "name": {"en": name_en, "fr": name_fr}, + "description": {"en": f'unknown;{type_str};eng', "fr": f'unknown;{type_str};fra'} + } + return_list.append(option_dic) return (return_list) # A function to map STAC assets to GeoCore option @@ -138,345 +148,323 @@ def assets_to_properties_options(assets_list): :return return list: geocore features properties option list """ return_list = [] - for var in assets_list: - var_dict = assets_list[var] - href = var_dict.get('href') - type_str= var_dict.get('type') - if type_str: - type_str=type_str.replace(';', ',') - name = var_dict.get('title') - if name: - try: - name_en,name_fr = name.split('/') - except: - name_en = name - name_fr = name + for var_dict in assets_list.values(): + href, type_str, name = var_dict.get('href'), var_dict.get('type', '').replace(';', ','), var_dict.get('title', 'Unknown/Inconnu') + name_en, name_fr = name.split('/') if '/' in name else (name, name) option_dic = { - "url": href, - "protocol": 'Unknown', - "name":{ - "en":'Asset - ' + name_en, - "fr":'Asset - ' + name_fr - }, - "description":{ - "en":'unknown' + ';' + type_str + ';' +'eng', - "fr":'unknown' + ';' + type_str + ';' +'fra' - } - } + "url": href, + "protocol": 'Unknown', + "name": {"en": f'Asset - {name_en}', "fr": f'Asset - {name_fr}'}, + "description": {"en": f'unknown;{type_str};eng', "fr": f'unknown;{type_str};fra'} + } return_list.append(option_dic) - return (return_list) + return return_list - -def get_collection_fields(coll_dict): - """Get the collection fields needed for the geocore mapping - :param coll_dict: dictionary of a singel STAC collection - """ +#root_to_features_properties +def root_to_features_properties(params, geocore_features_dict): + # Get the parameters + root_name = params['root_name'] + root_links = params['root_links'] + root_id = params['root_id'] + source = params['source'] + root_des = params['root_des'] + root_bbox = params['root_bbox'] + status = params['status'] + maintenance = params['maintenance'] + useLimits_en = params['useLimits_en'] + useLimits_fr = params['useLimits_fr'] + spatialRepresentation = params['spatialRepresentation'] + contact = params['contact'] + type_data = params['type_data'] + topicCategory = params['topicCategory'] - try: - coll_id = coll_dict['id'] - except KeyError: - coll_id = None - try: - coll_title = coll_dict['title'] - except KeyError: - coll_title = None - try: - coll_description = coll_dict['description'] - except KeyError: - coll_description = None - try: - coll_keywords = coll_dict['keywords'] - except KeyError: - coll_keywords = None - try: - coll_extent = coll_dict['extent'] - except KeyError: - coll_extent = None - try: - coll_links = coll_dict['links'] - except KeyError: - coll_links = None - try: - coll_assets = coll_dict['assets'] - except KeyError: - coll_assets = None - - # Get bbox and time - if coll_extent: - try: - coll_bbox = coll_extent['spatial']['bbox'][0] - time_begin = coll_extent['temporal']['interval'][0][0] - time_end = coll_extent['temporal']['interval'][0][1] - except: - coll_bbox = None - time_begin = None - time_end = None - else: - coll_bbox = None - time_begin = None - time_end = None - # get En and fr for des, keywords, and title - if coll_title: - try: - title_en,title_fr = coll_title.split('/') - except ValueError: - title_en = coll_title - title_fr = None - else: - title_en = coll_id # Title can not be none for geocoer search - title_fr = coll_id - if coll_description: - try: - description_en,description_fr = coll_description.split('/') - except ValueError: - description_en = coll_description - description_fr = None - else: - description_en = None - description_fr = None - if coll_keywords: - try: - keywords_en = coll_keywords[0:int(len(coll_keywords)/2)] - keywords_en = ', '.join(str(l) for l in keywords_en) - keywords_fr = coll_keywords[int(len(coll_keywords)/2): int(len(coll_keywords))] - keywords_fr = ', '.join(str(l) for l in keywords_fr) - except KeyError: - keywords_en = coll_keywords - keywords_fr = None - else: - keywords_en = None - keywords_fr = None - return coll_id, coll_bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr; + + properties_dict = geocore_features_dict['properties'] + root_name_en,root_name_fr = root_name.split('/') + #id + update_dict(properties_dict, {"id": f"{source}-root-{root_id}"}) + #title + update_dict(properties_dict['title'], {"en": f" Root - {root_name_en}"}) + update_dict(properties_dict['title'], {"fr": f" Racine - {root_name_fr}"}) -def get_item_fields(item_dict): - """Get the collection fields needed for the geocore mapping - :param item_dict: dictionary of a singel STAC item - """ - try: - item_id = item_dict['id'] - except KeyError: - item_id = None - try: - item_bbox = item_dict['bbox'] - except KeyError: - item_bbox = None - try: - item_links = item_dict['links'] - except KeyError: - item_links = None - try: - item_assets = item_dict['assets'] - except KeyError: - item_assets = None - try: - item_properties = item_dict['properties'] - except KeyError: - item_properties = None - try: - coll_id = item_dict['collection'] - except KeyError: - coll_id = None - return item_id, item_bbox, item_links, item_assets, item_properties, coll_id; + #options + links_list = links_to_properties_options(links_list=root_links, id=root_id, root_name=root_name, title_en=None, title_fr=None, stac_type='root') + options_list = links_list + options_list = [i for n, i in enumerate(options_list) if i not in options_list[n + 1:]] # delete duplicates + + #Descrption + en_desc = root_des + '.' + disclaimer_en if root_des else disclaimer_en + fr_desc = root_des + '.' + disclaimer_fr if root_des else disclaimer_fr + update_dict(properties_dict['description'], {'en': en_desc, 'fr': fr_desc}) + + #Keywords + keywords_common = 'SpatioTemporal Asset Catalog, stac' + update_dict(properties_dict['keywords'], {'en': f"{keywords_common}, {source}", 'fr': f"{keywords_common}, {source}"}) + + #Geometry + west, south, east, north = [round(coord, 2) for coord in root_bbox] + geometry_str = f"POLYGON(({west} {south}, {east} {south}, {east} {north}, {west} {north}, {west} {south}))" + update_dict(properties_dict, {"geometry": geometry_str}) + + # Other properties + update_dict(properties_dict, { + 'topicCategory': topicCategory, + 'type': type_data, + 'spatialRepresentation': spatialRepresentation, + 'status': status, + 'maintenance': maintenance, + 'contact': contact, + 'options': options_list, + 'useLimits': {'en': useLimits_en, 'fr': useLimits_fr}, + 'temporalExtent': {'end': 'Present', 'begin': '0001-01-01'} + }) + #parentIdentifier: None for STAC catalog + #date: None for STAC collection + #skipped: refsys, refSys_version + #skipped metadataStandard, metadataStandardVersion, metadataStandardVersion, graphicOverview, distributionFormat_name, distributionFormat_format + #skipped: accessConstraints, otherConstraints, dateStamp, dataSetURI, locale,language + #skipped: characterSet, environmentDescription,supplementalInformation + #skipped: credits, cited, distributor,sourceSystemName + return (properties_dict) -# stac_to_features_properties -#TODO implement *args and **kwargs for properties function -def to_features_properties(geocore_features_dict, coll_dict, item_dict,stac_type, root_name,root_id,source,status,maintenance, useLimits_en,useLimits_fr,spatialRepresentation,contact, type_data,topicCategory): +#collection_to_features_properties +def coll_to_features_properties(params, coll_dict,geocore_features_dict): + # Get the parameters + root_name = params['root_name'] + root_id = params['root_id'] + source = params['source'] + status = params['status'] + maintenance = params['maintenance'] + useLimits_en = params['useLimits_en'] + useLimits_fr = params['useLimits_fr'] + spatialRepresentation = params['spatialRepresentation'] + contact = params['contact'] + type_data = params['type_data'] + topicCategory = params['topicCategory'] + properties_dict = geocore_features_dict['properties'] - coll_id, bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr = get_collection_fields(coll_dict) - if stac_type == 'item': - item_id, bbox, item_links, item_assets, item_properties,coll_id = get_item_fields(item_dict) - #id - properties_dict.update({"id": source + '-' + coll_id + '-' + item_id}) - #title - item_date= datetime.strptime(item_properties['datetime'], '%Y-%m-%dT%H:%M:%SZ') - yr = item_date.strftime("%Y") - custom_coll = ["monthly-vegetation-parameters-20m-v1", "hrdem-lidar"] - if title_en != None and title_fr!= None and coll_id not in custom_coll: - properties_dict['title'].update({"en": yr + ' - ' + title_en}) - properties_dict['title'].update({"fr": yr + ' - ' + title_fr}) - elif title_en != None and title_fr!= None and coll_id == "monthly-vegetation-parameters-20m-v1": - properties_dict['title'].update({"en": item_id.split('-')[-1] + ' - ' + title_en}) - properties_dict['title'].update({"fr": item_id.split('-')[-1] + ' - ' + title_fr}) - print('test properti title is ', properties_dict['title']) - elif title_en != None and title_fr!= None and coll_id == "hrdem-lidar": - #title_header = re.search(r"(?<=-)[A-Za-z_]+", item_id).group().replace('_', ' ') - match = re.search(r"(?<=-)[A-Za-z_]+", item_id) - if match: - title_header = match.group().replace('_', ' ') - else:# Handle the case where no match is found, e.g., set a default value or raise a custom error - title_header = item_id - properties_dict['title'].update({"en": yr + ' ' + title_header + ' - '+ title_en}) - properties_dict['title'].update({"fr": yr + ' ' + title_header + ' - '+ title_fr}) - print('test properti title is ', properties_dict['title']) - - #parentIdentifier - properties_dict.update({"parentIdentifier": source + '-'+ coll_id}) - #date - if 'created' in item_properties.keys(): - item_created = item_properties['created'] - properties_dict['date']['published'].update({"text": 'publication; publication'}) - properties_dict['date']['published'].update({"date": item_created}) - properties_dict['date']['created'].update({"text": 'creation; création'}) - properties_dict['date']['created'].update({"date": item_created}) - #temporalExtent: begin is the datatime, hard coded 'Present'as end - properties_dict['temporalExtent'].update({"begin": item_date.strftime("%Y-%m-%d")}) - properties_dict['temporalExtent'].update({"end": 'Present'}) + + coll_id, coll_bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr = get_collection_fields(coll_dict) + #id + update_dict(properties_dict, {"id": source + '-' + coll_id}) + #title + if title_en != None and title_fr!= None: + update_dict(properties_dict, {'title':{'en':'Collection - ' + title_en, 'fr':'Collection - ' + title_fr}}) - #options - links_list = links_to_properties_options(links_list=item_links, id=item_id, root_name=root_name, title_en=title_en, title_fr=title_fr, stac_type='item') - if item_assets: - assets_list = assets_to_properties_options(assets_list=item_assets) - options_list = links_list+assets_list - else: - options_list = links_list - options_list = [i for n, i in enumerate(options_list) if i not in options_list[n + 1:]] # delete duplicates + #parentIdentifier: root id + update_dict(properties_dict, {"parentIdentifier": source + '-root-'+ root_id}) + #temporalExtent + time_begin_str = datetime.strptime(time_begin, '%Y-%m-%dT%H:%M:%SZ').strftime("%Y-%m-%d") if time_begin else '0001-01-01' + time_end_str = datetime.strptime(time_end, '%Y-%m-%dT%H:%M:%SZ').strftime("%Y-%m-%d") if time_end else 'Present' + temporal_extent_updates = {"begin": time_begin_str, "end": time_end_str} + update_dict(properties_dict['temporalExtent'], temporal_extent_updates) + + #options + links_list = links_to_properties_options(links_list=coll_links, id=coll_id, root_name=root_name, title_en=title_en, title_fr=title_fr, stac_type='collection') + assets_list = assets_to_properties_options(assets_list=coll_assets) if coll_assets else [] + options_list = links_list+assets_list + options_list = [i for n, i in enumerate(options_list) if i not in options_list[n + 1:]] # delete duplicates + - else: - #id - properties_dict.update({"id": source + '-' + coll_id}) - #title - if title_en != None and title_fr!= None: - properties_dict['title'].update({"en": 'Collection - ' + title_en}) - properties_dict['title'].update({"fr": 'Collection - ' + title_fr}) - #parentIdentifier: root id - properties_dict.update({"parentIdentifier": source + '-root-'+ root_id}) - # date: None for STAC collection - # Type: hardcoded - #temporalExtent - if time_begin: - time_begin= datetime.strptime(time_begin, '%Y-%m-%dT%H:%M:%SZ') - properties_dict['temporalExtent'].update({"begin": time_begin.strftime("%Y-%m-%d")}) - else: - properties_dict['temporalExtent'].update({"begin": '0001-01-01'}) - - if time_end: - time_end= datetime.strptime(time_end, '%Y-%m-%dT%H:%M:%SZ') - properties_dict['temporalExtent'].update({"end": time_end.strftime("%Y-%m-%d")}) - else: #hard code end 'Present', required for page to load - properties_dict['temporalExtent'].update({"end": 'Present'}) - - #options - links_list = links_to_properties_options(links_list=coll_links, id=coll_id, root_name=root_name, title_en=title_en, title_fr=title_fr, stac_type='item') - if coll_assets: - assets_list = assets_to_properties_options(assets_list=coll_assets) - options_list = links_list+assets_list - else: - options_list = links_list - options_list = [i for n, i in enumerate(options_list) if i not in options_list[n + 1:]] # delete duplicates - - # The shared attributes between Items and Collections - #descrption - if description_en!= None and description_fr != None: - properties_dict['description'].update({"en": description_en + ' ' + disclaimer_en}) - properties_dict['description'].update({"fr": description_fr + ' ' + disclaimer_fr}) - else: - properties_dict['description'].update({"en": disclaimer_en}) - properties_dict['description'].update({"fr": disclaimer_fr}) - #keywords - if keywords_en!= None and keywords_fr != None: - properties_dict['keywords'].update({"en": 'SpatioTemporal Asset Catalog, ' + 'stac, ' + keywords_en}) - properties_dict['keywords'].update({"fr": 'SpatioTemporal Asset Catalog, ' + 'stac, ' + keywords_fr}) - # topicCategory - properties_dict.update({"topicCategory": topicCategory}) - properties_dict.update({"type": type_data}) - #geometry - west = round(bbox[0], 2) - south = round(bbox[1],2) - east = round(bbox[2],2) - north = round(bbox[3],2) - geometry_str = "POLYGON((" + str(west) + " " + str(south) +', ' + str(east) +" "+ str(south) + ", " + str(east) +" "+ str(north) + ", " + str(west) +" "+ str(north) + ", " + str(west) +" "+ str(south) + "))" - properties_dict.update({"geometry":geometry_str}) - #Spatialrepresentation - properties_dict.update({"spatialRepresentation":spatialRepresentation}) + description_en_str = f"{description_en or ''} {disclaimer_en}" + description_fr_str = f"{description_fr or ''} {disclaimer_fr}" + keywords_en_str = f"SpatioTemporal Asset Catalog, stac, {keywords_en or ''}" + keywords_fr_str = f"SpatioTemporal Asset Catalog, stac, {keywords_fr or ''}" + + #Geometry + west, south, east, north = [round(coord, 2) for coord in coll_bbox] + geometry_str = f"POLYGON(({west} {south}, {east} {south}, {east} {north}, {west} {north}, {west} {south}))" + + # Other properties + update_dict(properties_dict, { + "topicCategory": topicCategory, + "type": type_data, + "spatialRepresentation":spatialRepresentation, + "status":status, + "maintenance":maintenance, + 'useLimits': {'en': useLimits_en, 'fr': useLimits_fr}, + 'contact': contact, + 'options': options_list, + 'description': {'en': description_en_str, 'fr': description_fr_str}, + 'keywords': {'en': keywords_en_str, 'fr': keywords_fr_str}, + "geometry": geometry_str, + + }) + + #skipped: date: None for STAC collection #skipped: refsys, refSys_version - properties_dict.update({"status":status}) - properties_dict.update({"maintenance":maintenance}) - # Skipped metadataStandard, metadataStandardVersion, metadataStandardVersion, graphicOverview, distributionFormat_name, distributionFormat_format - #useLimits - properties_dict['useLimits'].update({"en": useLimits_en}) - properties_dict['useLimits'].update({"fr": useLimits_fr}) + #skipped metadataStandard, metadataStandardVersion, metadataStandardVersion, graphicOverview, distributionFormat_name, distributionFormat_format #skipped: accessConstraints, otherConstraints, dateStamp, dataSetURI, locale,language #skipped: characterSet, environmentDescription,supplementalInformation - # Contact - properties_dict.update({'contact': contact}) - # Skipped: credits, cited, distributor,sourceSystemName + #skipped: credits, cited, distributor,sourceSystemName # options - properties_dict.update({'options': options_list}) return (properties_dict) -def root_to_features_properties(geocore_features_dict,root_name, root_links, root_id,source,root_des, coll_bbox, status,maintenance, useLimits_en,useLimits_fr,spatialRepresentation,contact, type_data,topicCategory): +def get_collection_fields(coll_dict): + """Get the collection fields needed for the geocore mapping + :param coll_dict: dictionary of a singel STAC collection + """ + # Directly extract values using .get() method + #.get() method allows you to provide a default value (in this case, None) if the key is not found. + coll_id = coll_dict.get('id') + coll_title = coll_dict.get('title') + coll_description = coll_dict.get('description') + coll_keywords = coll_dict.get('keywords') + coll_extent = coll_dict.get('extent') + coll_links = coll_dict.get('links') + coll_assets = coll_dict.get('assets') + + # Get bbox and time + coll_bbox, time_begin, time_end = None, None, None + if coll_extent: + coll_bbox = coll_extent.get('spatial', {}).get('bbox', [None])[0] + temporal_interval = coll_extent.get('temporal', {}).get('interval', [[None, None]])[0] + time_begin, time_end = temporal_interval + + # Get English and French for description, keywords, and title + title_en, title_fr = (coll_title.split('/') + [coll_id, coll_id])[:2] if coll_title else (coll_id, coll_id) + description_en, description_fr = (coll_description.split('/') + [None, None])[:2] if coll_description else (None, None) + + if coll_keywords: + half_length = len(coll_keywords) // 2 + keywords_en = ', '.join(str(l) for l in coll_keywords[:half_length]) + keywords_fr = ', '.join(str(l) for l in coll_keywords[half_length:]) + else: + keywords_en, keywords_fr = None, None + + return coll_id, coll_bbox, time_begin, time_end, coll_links, coll_assets, title_en, title_fr, description_en, description_fr, keywords_en, keywords_fr + +def create_coll_dict(api_root): + response_collection = requests.get(f'{api_root}/collections/') + collection_data_list = response_collection.json().get('collections', []) + + coll_id_dict = { + coll_dict['id']: { + "title": {'en': fields[6], 'fr': fields[7]}, + 'description': {'en': fields[8], 'fr': fields[9]}, + 'keywords': {'en': fields[10], 'fr': fields[11]}, + } + for coll_dict in collection_data_list + for fields in [get_collection_fields(coll_dict)] + } + return coll_id_dict + + +#Item_to_features_properties + + +def item_to_features_properties(params, geocore_features_dict, item_dict, coll_id_dict): + root_name = params['root_name'] + root_id = params['root_id'] + source = params['source'] + status = params['status'] + maintenance = params['maintenance'] + useLimits_en = params['useLimits_en'] + useLimits_fr = params['useLimits_fr'] + spatialRepresentation = params['spatialRepresentation'] + contact = params['contact'] + type_data = params['type_data'] + topicCategory = params['topicCategory'] + properties_dict = geocore_features_dict['properties'] - root_name_en,root_name_fr = root_name.split('/') + # Get item level lelments + item_id, item_bbox, item_links, item_assets, item_properties,coll_id = get_item_fields(item_dict) + + # Get collection level keywords, title, and description + coll_data = coll_id_dict.get(coll_id, {}) + title_en = coll_data.get('title', {}).get('en') + title_fr = coll_data.get('title', {}).get('fr') + description_en = coll_data.get('description', {}).get('en') + description_fr = coll_data.get('description', {}).get('fr') + keywords_en = coll_data.get('keywords', {}).get('en') + keywords_fr = coll_data.get('keywords', {}).get('fr') + #id - properties_dict.update({"id": source + '-root-' + root_id}) - #title = root name - properties_dict['title'].update({"en": 'Root - ' + root_name_en}) - properties_dict['title'].update({"fr": 'Racine - ' + root_name_fr}) - #parentIdentifier: None for STAC catalog - #properties_dict.update({"parentIdentifier": None}) - #date: None for STAC collection - #Type: hardcoded + properties_dict.update({"id": source + '-' + coll_id + '-' + item_id}) + #title + item_date= datetime.strptime(item_properties['datetime'], '%Y-%m-%dT%H:%M:%SZ') + yr = item_date.strftime("%Y") + custom_coll = ["monthly-vegetation-parameters-20m-v1", "hrdem-lidar"] + if title_en != None and title_fr!= None and coll_id not in custom_coll: + update_dict(properties_dict, {'title':{'en':yr + ' - ' + title_en, 'fr':yr + ' - ' + title_fr}}) + elif title_en != None and title_fr!= None and coll_id == "monthly-vegetation-parameters-20m-v1": + # month+coll-title + update_dict(properties_dict, {'title':{'en':item_id.split('-')[-1] + ' - ' + title_en, 'fr':item_id.split('-')[-1] + ' - ' + title_fr}}) + elif title_en != None and title_fr!= None and coll_id == "hrdem-lidar": + # Get the location Seine Rat River from id MB-Seine_Rat_River-1m + match = re.search(r"(?<=-)[A-Za-z_]+", item_id) + if match: + title_header = match.group().replace('_', ' ') + else:# Handle the case where no match is found, e.g., set a default value or raise a custom error + title_header = item_id + update_dict(properties_dict, {'title':{'en':yr + ' ' + title_header + ' - '+ title_en, 'fr':yr + ' ' + title_header + ' - '+ title_fr}}) + + #parentIdentifier + update_dict(properties_dict, {"parentIdentifier": source + '-'+ coll_id}) + + #TemporalExtent + if 'created' in item_properties.keys(): + item_created = item_properties['created'] + properties_dict['date']['published'].update({"text": 'publication; publication'}) + properties_dict['date']['published'].update({"date": item_created}) + properties_dict['date']['created'].update({"text": 'creation; création'}) + properties_dict['date']['created'].update({"date": item_created}) + #temporalExtent: begin is the datatime, hard coded 'Present'as end + properties_dict['temporalExtent'].update({"begin": item_date.strftime("%Y-%m-%d")}) + properties_dict['temporalExtent'].update({"end": 'Present'}) + #options - links_list = links_to_properties_options(links_list=root_links, id=root_id, root_name=root_name, title_en=None, title_fr=None, stac_type='root') - options_list = links_list + links_list = links_to_properties_options(links_list=item_links, id=item_id, root_name=root_name, title_en=title_en, title_fr=title_fr, stac_type='item') + assets_list = assets_to_properties_options(assets_list=item_assets) if item_assets else [] + options_list = links_list+assets_list options_list = [i for n, i in enumerate(options_list) if i not in options_list[n + 1:]] # delete duplicates - # The shared attributes between Items and Collections - #descrption - if root_des!= None: - properties_dict['description'].update({"en": root_des + '.' + disclaimer_en}) - properties_dict['description'].update({"fr": root_des + '.' + disclaimer_fr }) - else: - properties_dict['description'].update({"en": disclaimer_en}) - properties_dict['description'].update({"fr": disclaimer_fr }) - #keywords - properties_dict['keywords'].update({"en": 'SpatioTemporal Asset Catalog, ' + 'stac, '+ source}) - properties_dict['keywords'].update({"fr": 'SpatioTemporal Asset Catalog, ' + 'stac, ' + source}) - # topicCategory - properties_dict.update({"topicCategory": topicCategory}) - properties_dict.update({"type": type_data}) - #geometry - west = round(coll_bbox[0], 2) - south = round(coll_bbox[1],2) - east = round(coll_bbox[2],2) - north = round(coll_bbox[3],2) - geometry_str = "POLYGON((" + str(west) + " " + str(south) +', ' + str(east) +" "+ str(south) + ", " + str(east) +" "+ str(north) + ", " + str(west) +" "+ str(north) + ", " + str(west) +" "+ str(south) + "))" - properties_dict.update({"geometry":geometry_str}) - #Spatialrepresentation - properties_dict.update({"spatialRepresentation":spatialRepresentation}) + description_en_str = f"{description_en or ''} {disclaimer_en}" + description_fr_str = f"{description_fr or ''} {disclaimer_fr}" + keywords_en_str = f"SpatioTemporal Asset Catalog, stac, {keywords_en or ''}" + keywords_fr_str = f"SpatioTemporal Asset Catalog, stac, {keywords_fr or ''}" + + #Geometry + west, south, east, north = [round(coord, 2) for coord in item_bbox] + geometry_str = f"POLYGON(({west} {south}, {east} {south}, {east} {north}, {west} {north}, {west} {south}))" + + # Other properties + update_dict(properties_dict, { + "topicCategory": topicCategory, + "type": type_data, + "spatialRepresentation":spatialRepresentation, + "status":status, + "maintenance":maintenance, + 'useLimits': {'en': useLimits_en, 'fr': useLimits_fr}, + 'contact': contact, + 'options': options_list, + 'description': {'en': description_en_str, 'fr': description_fr_str}, + 'keywords': {'en': keywords_en_str, 'fr': keywords_fr_str}, + "geometry": geometry_str, + + }) + + #skipped: date: None for STAC collection #skipped: refsys, refSys_version - properties_dict.update({"status":status}) - properties_dict.update({"maintenance":maintenance}) - # Skipped metadataStandard, metadataStandardVersion, metadataStandardVersion, graphicOverview, distributionFormat_name, distributionFormat_format - #useLimits - properties_dict['useLimits'].update({"en": useLimits_en}) - properties_dict['useLimits'].update({"fr": useLimits_fr}) + #skipped metadataStandard, metadataStandardVersion, metadataStandardVersion, graphicOverview, distributionFormat_name, distributionFormat_format #skipped: accessConstraints, otherConstraints, dateStamp, dataSetURI, locale,language #skipped: characterSet, environmentDescription,supplementalInformation - # Contact - properties_dict.update({'contact': contact}) - # Skipped: credits, cited, distributor,sourceSystemName + #skipped: credits, cited, distributor,sourceSystemName # options - properties_dict.update({'options': options_list}) - # Hard code end 'Present' - properties_dict['temporalExtent'].update({"end":'Present'}) - properties_dict['temporalExtent'].update({"begin": '0001-01-01'}) return (properties_dict) -def update_geocore_dict(geocore_features_dict, properties_dict,geometry_dict): - """Update the GeoCore geocore_features_dict null template with the updated propoerties dict and geometry dict - :param geocore_features_dict: null template of geocore_features_dict in dict format - :param properties_dict: mapped/updatedd STAC geocore properties dict - :param geometry_dict: mapped/updatedd STAC geocore geometry dict - :return: STAC geocore dict - """ - geocore_features_dict.update({"properties": properties_dict}) - geocore_features_dict.update({"geometry": geometry_dict}) - geocore_updated = { - "type": "FeatureCollection", - "features": [geocore_features_dict] - } - return geocore_updated + +def get_item_fields(item_dict): + """Get the collection fields needed for the geocore mapping + :param item_dict: dictionary of a singel STAC item + """ + item_id = item_dict.get('id') + item_bbox = item_dict.get('bbox') + item_links = item_dict.get('links') + item_assets = item_dict.get('assets') + item_properties = item_dict.get('properties') + coll_id = item_dict.get('collection') + return item_id, item_bbox, item_links, item_assets, item_properties, coll_id;