From ef48282d67840363badd4b170271c60c30c6d4bc Mon Sep 17 00:00:00 2001 From: David Gamez <1192523+davidgamez@users.noreply.github.com> Date: Wed, 5 Jun 2024 16:46:07 -0400 Subject: [PATCH] feat: update batch processing to store historical datasets for all statuses except deprecated (#457) --- functions-python/README.md | 6 +-- .../batch_datasets/.env.rename_me | 11 ++++- functions-python/batch_datasets/README.md | 34 ++++++++++++++ functions-python/batch_datasets/src/main.py | 47 +++++++++---------- .../batch_datasets/tests/conftest.py | 23 ++++++++- .../tests/test_batch_datasets_main.py | 19 ++++---- scripts/function-python-run.sh | 1 + 7 files changed, 100 insertions(+), 41 deletions(-) diff --git a/functions-python/README.md b/functions-python/README.md index 208f4af80..e6e1918b7 100644 --- a/functions-python/README.md +++ b/functions-python/README.md @@ -58,16 +58,16 @@ export MY_AWESOME_KEY=MY_AWESOME_VALUE ``` # Unit tests -If a folder `tests` is added to a function's folder, the script `api-test.sh` will execute the tests without any further configuration. +If a folder `tests` is added to a function's folder, the script `api-tests.sh` will execute the tests without any further configuration. Make sure the testing database is running before executing the tests. ``` docker-compose --env-file ./config/.env.local up -d liquibase-test ``` Execute all tests within the functions-python folder ``` -./scripts/api-test.sh --folder functions-python +./scripts/api-tests.sh --folder functions-python ``` Execute test from a specific function ``` -./scripts/api-test.sh --folder functions-python/batch_datasets +./scripts/api-tests.sh --folder functions-python/batch_datasets ``` \ No newline at end of file diff --git a/functions-python/batch_datasets/.env.rename_me b/functions-python/batch_datasets/.env.rename_me index 7b17cae57..2a727fee1 100644 --- a/functions-python/batch_datasets/.env.rename_me +++ b/functions-python/batch_datasets/.env.rename_me @@ -1,2 +1,9 @@ -# Environment variables for tokens function to run locally -export FEEDS_DATABASE_URL={{FEEDS_DATABASE_URL}} \ No newline at end of file +# Environment variables for tokens function to run locally. Delete this line after rename the file. +FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/MobilityDatabase +PROJECT_ID=my-project-id +PUBSUB_TOPIC_NAME=my-topic +DATASTORE_DATASET=my-project-id +DATASTORE_EMULATOR_HOST=localhost:8044 +DATASTORE_EMULATOR_HOST_PATH=localhost:8044/datastore +DATASTORE_HOST=http://localhost:8044 +DATASTORE_PROJECT_ID=my-project-id diff --git a/functions-python/batch_datasets/README.md b/functions-python/batch_datasets/README.md index 4485a70c9..b1da5172d 100644 --- a/functions-python/batch_datasets/README.md +++ b/functions-python/batch_datasets/README.md @@ -30,3 +30,37 @@ The function is configured using the following environment variables: # Local development The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information. +## Test locally with Google Cloud Emulators + +```bash +gcloud components install cloud-datastore-emulator +``` + +- Install the Pub/Sub emulator +```bash +gcloud components install pubsub-emulator +``` +- Install the Cloud Datastore emulator +```bash + +``` + +- Execute the following commands to start the emulators: +```bash + gcloud beta emulators pubsub start --project=project-id --host-port='localhost:8043' + gcloud beta emulators datastore start --project=project-id --host-port='localhost:8044' +``` +- Start function +```bash + ./scripts/function-python-run.sh --function_name batch_datasets +``` +- Execute function +```bash + curl http://localhost:8080 +``` + +# Test +- Run the tests +```bash + ./scripts/api-tests.sh --folder functions-python/batch_datasets +``` \ No newline at end of file diff --git a/functions-python/batch_datasets/src/main.py b/functions-python/batch_datasets/src/main.py index 9f1afadf0..cd33bb9e0 100644 --- a/functions-python/batch_datasets/src/main.py +++ b/functions-python/batch_datasets/src/main.py @@ -60,13 +60,12 @@ def publish(publisher: PublisherClient, topic_path: str, data_bytes: bytes) -> F return publisher.publish(topic_path, data=data_bytes) -def get_active_feeds(session: Session): +def get_non_deprecated_feeds(session: Session): """ - Returns a list of active feeds - :return: list of active feeds + Returns a list of non deprecated feeds + :return: list of feeds """ - # Query the feeds database for active feeds with a GTFS dataset and no authentication - # and also feeds without gtfs datasets + # Query the database for Gtfs feeds with status different from deprecated query = ( session.query( Gtfsfeed.stable_id, @@ -75,12 +74,13 @@ def get_active_feeds(session: Session): Gtfsfeed.authentication_type, Gtfsfeed.authentication_info_url, Gtfsfeed.api_key_parameter_name, + Gtfsfeed.status, Gtfsdataset.id.label("dataset_id"), Gtfsdataset.hash.label("dataset_hash"), ) .select_from(Gtfsfeed) .outerjoin(Gtfsdataset, (Gtfsdataset.feed_id == Gtfsfeed.id)) - .filter(Gtfsfeed.status == "active") + .filter(Gtfsfeed.status != "deprecated") .filter(or_(Gtfsdataset.id.is_(None), Gtfsdataset.latest.is_(True))) ) # Limit the query to 10 feeds (or FEEDS_LIMIT param) for testing purposes and lower environments @@ -88,7 +88,7 @@ def get_active_feeds(session: Session): limit = os.getenv("FEEDS_LIMIT") query = query.limit(10 if limit is None else int(limit)) results = query.all() - print(f"Retrieved {len(results)} active feeds.") + print(f"Retrieved {len(results)} feeds.") return results @@ -104,17 +104,16 @@ def batch_datasets(request): :param request: HTTP request object :return: HTTP response object """ - try: session = start_db_session(os.getenv("FEEDS_DATABASE_URL")) - active_feeds = get_active_feeds(session) + feeds = get_non_deprecated_feeds(session) except Exception as error: - print(f"Error retrieving active feeds: {error}") - raise Exception(f"Error retrieving active feeds: {error}") + print(f"Error retrieving feeds: {error}") + raise Exception(f"Error retrieving feeds: {error}") finally: close_db_session(session) - print(f"Retrieved {len(active_feeds)} active feeds.") + print(f"Retrieved {len(feeds)} feeds.") publisher = get_pubsub_client() topic_path = publisher.topic_path(project_id, pubsub_topic_name) trace_id = request.headers.get("X-Cloud-Trace-Context") @@ -122,29 +121,29 @@ def batch_datasets(request): f"batch-trace-{trace_id}" if trace_id else f"batch-uuid-{uuid.uuid4()}" ) timestamp = datetime.now() - for active_feed in active_feeds: + for feed in feeds: payload = { "execution_id": execution_id, - "producer_url": active_feed.producer_url, - "feed_stable_id": active_feed.stable_id, - "feed_id": active_feed.feed_id, - "dataset_id": active_feed.dataset_id, - "dataset_hash": active_feed.dataset_hash, - "authentication_type": active_feed.authentication_type, - "authentication_info_url": active_feed.authentication_info_url, - "api_key_parameter_name": active_feed.api_key_parameter_name, + "producer_url": feed.producer_url, + "feed_stable_id": feed.stable_id, + "feed_id": feed.feed_id, + "dataset_id": feed.dataset_id, + "dataset_hash": feed.dataset_hash, + "authentication_type": feed.authentication_type, + "authentication_info_url": feed.authentication_info_url, + "api_key_parameter_name": feed.api_key_parameter_name, } data_str = json.dumps(payload) print(f"Publishing {data_str} to {topic_path}.") future = publish(publisher, topic_path, data_str.encode("utf-8")) future.add_done_callback( - lambda _: publish_callback(future, active_feed["stable_id"], topic_path) + lambda _: publish_callback(future, feed["stable_id"], topic_path) ) BatchExecutionService().save( BatchExecution( execution_id=execution_id, - feeds_total=len(active_feeds), + feeds_total=len(feeds), timestamp=timestamp, ) ) - return f"Publish completed. Published {len(active_feeds)} feeds to {pubsub_topic_name}." + return f"Publish completed. Published {len(feeds)} feeds to {pubsub_topic_name}." diff --git a/functions-python/batch_datasets/tests/conftest.py b/functions-python/batch_datasets/tests/conftest.py index fab2e0aaa..a20eba8f2 100644 --- a/functions-python/batch_datasets/tests/conftest.py +++ b/functions-python/batch_datasets/tests/conftest.py @@ -25,8 +25,9 @@ def populate_database(): """ Populates the database with fake data with the following distribution: - 10 GTFS feeds - - 5 active - - 5 inactive + - 3 active + - 7 inactive + - 2 deprecated - 5 GTFS Realtime feeds - 9 GTFS datasets - 3 active in active feeds @@ -52,6 +53,24 @@ def populate_database(): ) session.add(feed) + for i in range(2): + feed = Gtfsfeed( + id=fake.uuid4(), + data_type="gtfs", + feed_name=fake.name(), + note=fake.sentence(), + producer_url=fake.url(), + authentication_type="0" if (i in [0, 1, 2]) else "1", + authentication_info_url=None, + api_key_parameter_name=None, + license_url=fake.url(), + stable_id=fake.uuid4(), + status="deprecated", + feed_contact_email=fake.email(), + provider=fake.company(), + ) + session.add(feed) + # GTFS datasets leaving one active feed without a dataset active_gtfs_feeds = session.query(Gtfsfeed).all() for i in range(1, 9): diff --git a/functions-python/batch_datasets/tests/test_batch_datasets_main.py b/functions-python/batch_datasets/tests/test_batch_datasets_main.py index 4d91b23c8..b8423f8a1 100644 --- a/functions-python/batch_datasets/tests/test_batch_datasets_main.py +++ b/functions-python/batch_datasets/tests/test_batch_datasets_main.py @@ -18,17 +18,16 @@ from unittest import mock import pytest from unittest.mock import Mock, patch, MagicMock -from batch_datasets.src.main import get_active_feeds, batch_datasets +from batch_datasets.src.main import get_non_deprecated_feeds, batch_datasets from test_utils.database_utils import get_testing_session, default_db_url -def test_get_active_feeds(): +def test_get_non_deprecated_feeds(): with get_testing_session() as session: - active_feeds = get_active_feeds(session) - assert len(active_feeds) == 3 - # assert all active feeds has authentication_type == '0' - for feed in active_feeds: - assert feed.authentication_type == "0" + feeds = get_non_deprecated_feeds(session) + assert len(feeds) == 10 + assert len([feed for feed in feeds if feed.status == "active"]) == 3 + assert len([feed for feed in feeds if feed.status == "inactive"]) == 7 @mock.patch.dict( @@ -45,7 +44,7 @@ def test_get_active_feeds(): def test_batch_datasets(mock_client, mock_publish): mock_client.return_value = MagicMock() with get_testing_session() as session: - active_feeds = get_active_feeds(session) + feeds = get_non_deprecated_feeds(session) with patch( "dataset_service.main.BatchExecutionService.__init__", return_value=None ): @@ -53,7 +52,7 @@ def test_batch_datasets(mock_client, mock_publish): "dataset_service.main.BatchExecutionService.save", return_value=None ): batch_datasets(Mock()) - assert mock_publish.call_count == 3 + assert mock_publish.call_count == 5 # loop over mock_publish.call_args_list and check that the stable_id of the feed is in the list of # active feeds for i in range(3): @@ -61,7 +60,7 @@ def test_batch_datasets(mock_client, mock_publish): mock_publish.call_args_list[i][0][2].decode("utf-8") ) assert message["feed_stable_id"] in [ - feed.stable_id for feed in active_feeds + feed.stable_id for feed in feeds ] diff --git a/scripts/function-python-run.sh b/scripts/function-python-run.sh index 32095a895..bded1e806 100755 --- a/scripts/function-python-run.sh +++ b/scripts/function-python-run.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # # MobilityData 2023