Skip to content

Commit

Permalink
feat: update batch processing to store historical datasets for all st…
Browse files Browse the repository at this point in the history
…atuses except deprecated (#457)
  • Loading branch information
davidgamez authored Jun 5, 2024
1 parent 70e7195 commit ef48282
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 41 deletions.
6 changes: 3 additions & 3 deletions functions-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ export MY_AWESOME_KEY=MY_AWESOME_VALUE
```

# Unit tests
If a folder `tests` is added to a function's folder, the script `api-test.sh` will execute the tests without any further configuration.
If a folder `tests` is added to a function's folder, the script `api-tests.sh` will execute the tests without any further configuration.
Make sure the testing database is running before executing the tests.
```
docker-compose --env-file ./config/.env.local up -d liquibase-test
```
Execute all tests within the functions-python folder
```
./scripts/api-test.sh --folder functions-python
./scripts/api-tests.sh --folder functions-python
```
Execute test from a specific function
```
./scripts/api-test.sh --folder functions-python/batch_datasets
./scripts/api-tests.sh --folder functions-python/batch_datasets
```
11 changes: 9 additions & 2 deletions functions-python/batch_datasets/.env.rename_me
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
# Environment variables for tokens function to run locally
export FEEDS_DATABASE_URL={{FEEDS_DATABASE_URL}}
# Environment variables for tokens function to run locally. Delete this line after rename the file.
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/MobilityDatabase
PROJECT_ID=my-project-id
PUBSUB_TOPIC_NAME=my-topic
DATASTORE_DATASET=my-project-id
DATASTORE_EMULATOR_HOST=localhost:8044
DATASTORE_EMULATOR_HOST_PATH=localhost:8044/datastore
DATASTORE_HOST=http://localhost:8044
DATASTORE_PROJECT_ID=my-project-id
34 changes: 34 additions & 0 deletions functions-python/batch_datasets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,37 @@ The function is configured using the following environment variables:
# Local development
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.

## Test locally with Google Cloud Emulators

```bash
gcloud components install cloud-datastore-emulator
```

- Install the Pub/Sub emulator
```bash
gcloud components install pubsub-emulator
```
- Install the Cloud Datastore emulator
```bash

```

- Execute the following commands to start the emulators:
```bash
gcloud beta emulators pubsub start --project=project-id --host-port='localhost:8043'
gcloud beta emulators datastore start --project=project-id --host-port='localhost:8044'
```
- Start function
```bash
./scripts/function-python-run.sh --function_name batch_datasets
```
- Execute function
```bash
curl http://localhost:8080
```

# Test
- Run the tests
```bash
./scripts/api-tests.sh --folder functions-python/batch_datasets
```
47 changes: 23 additions & 24 deletions functions-python/batch_datasets/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ def publish(publisher: PublisherClient, topic_path: str, data_bytes: bytes) -> F
return publisher.publish(topic_path, data=data_bytes)


def get_active_feeds(session: Session):
def get_non_deprecated_feeds(session: Session):
"""
Returns a list of active feeds
:return: list of active feeds
Returns a list of non deprecated feeds
:return: list of feeds
"""
# Query the feeds database for active feeds with a GTFS dataset and no authentication
# and also feeds without gtfs datasets
# Query the database for Gtfs feeds with status different from deprecated
query = (
session.query(
Gtfsfeed.stable_id,
Expand All @@ -75,20 +74,21 @@ def get_active_feeds(session: Session):
Gtfsfeed.authentication_type,
Gtfsfeed.authentication_info_url,
Gtfsfeed.api_key_parameter_name,
Gtfsfeed.status,
Gtfsdataset.id.label("dataset_id"),
Gtfsdataset.hash.label("dataset_hash"),
)
.select_from(Gtfsfeed)
.outerjoin(Gtfsdataset, (Gtfsdataset.feed_id == Gtfsfeed.id))
.filter(Gtfsfeed.status == "active")
.filter(Gtfsfeed.status != "deprecated")
.filter(or_(Gtfsdataset.id.is_(None), Gtfsdataset.latest.is_(True)))
)
# Limit the query to 10 feeds (or FEEDS_LIMIT param) for testing purposes and lower environments
if os.getenv("ENVIRONMENT", "").lower() in ("dev", "test", "qa"):
limit = os.getenv("FEEDS_LIMIT")
query = query.limit(10 if limit is None else int(limit))
results = query.all()
print(f"Retrieved {len(results)} active feeds.")
print(f"Retrieved {len(results)} feeds.")

return results

Expand All @@ -104,47 +104,46 @@ def batch_datasets(request):
:param request: HTTP request object
:return: HTTP response object
"""

try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
active_feeds = get_active_feeds(session)
feeds = get_non_deprecated_feeds(session)
except Exception as error:
print(f"Error retrieving active feeds: {error}")
raise Exception(f"Error retrieving active feeds: {error}")
print(f"Error retrieving feeds: {error}")
raise Exception(f"Error retrieving feeds: {error}")
finally:
close_db_session(session)

print(f"Retrieved {len(active_feeds)} active feeds.")
print(f"Retrieved {len(feeds)} feeds.")
publisher = get_pubsub_client()
topic_path = publisher.topic_path(project_id, pubsub_topic_name)
trace_id = request.headers.get("X-Cloud-Trace-Context")
execution_id = (
f"batch-trace-{trace_id}" if trace_id else f"batch-uuid-{uuid.uuid4()}"
)
timestamp = datetime.now()
for active_feed in active_feeds:
for feed in feeds:
payload = {
"execution_id": execution_id,
"producer_url": active_feed.producer_url,
"feed_stable_id": active_feed.stable_id,
"feed_id": active_feed.feed_id,
"dataset_id": active_feed.dataset_id,
"dataset_hash": active_feed.dataset_hash,
"authentication_type": active_feed.authentication_type,
"authentication_info_url": active_feed.authentication_info_url,
"api_key_parameter_name": active_feed.api_key_parameter_name,
"producer_url": feed.producer_url,
"feed_stable_id": feed.stable_id,
"feed_id": feed.feed_id,
"dataset_id": feed.dataset_id,
"dataset_hash": feed.dataset_hash,
"authentication_type": feed.authentication_type,
"authentication_info_url": feed.authentication_info_url,
"api_key_parameter_name": feed.api_key_parameter_name,
}
data_str = json.dumps(payload)
print(f"Publishing {data_str} to {topic_path}.")
future = publish(publisher, topic_path, data_str.encode("utf-8"))
future.add_done_callback(
lambda _: publish_callback(future, active_feed["stable_id"], topic_path)
lambda _: publish_callback(future, feed["stable_id"], topic_path)
)
BatchExecutionService().save(
BatchExecution(
execution_id=execution_id,
feeds_total=len(active_feeds),
feeds_total=len(feeds),
timestamp=timestamp,
)
)
return f"Publish completed. Published {len(active_feeds)} feeds to {pubsub_topic_name}."
return f"Publish completed. Published {len(feeds)} feeds to {pubsub_topic_name}."
23 changes: 21 additions & 2 deletions functions-python/batch_datasets/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def populate_database():
"""
Populates the database with fake data with the following distribution:
- 10 GTFS feeds
- 5 active
- 5 inactive
- 3 active
- 7 inactive
- 2 deprecated
- 5 GTFS Realtime feeds
- 9 GTFS datasets
- 3 active in active feeds
Expand All @@ -52,6 +53,24 @@ def populate_database():
)
session.add(feed)

for i in range(2):
feed = Gtfsfeed(
id=fake.uuid4(),
data_type="gtfs",
feed_name=fake.name(),
note=fake.sentence(),
producer_url=fake.url(),
authentication_type="0" if (i in [0, 1, 2]) else "1",
authentication_info_url=None,
api_key_parameter_name=None,
license_url=fake.url(),
stable_id=fake.uuid4(),
status="deprecated",
feed_contact_email=fake.email(),
provider=fake.company(),
)
session.add(feed)

# GTFS datasets leaving one active feed without a dataset
active_gtfs_feeds = session.query(Gtfsfeed).all()
for i in range(1, 9):
Expand Down
19 changes: 9 additions & 10 deletions functions-python/batch_datasets/tests/test_batch_datasets_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
from unittest import mock
import pytest
from unittest.mock import Mock, patch, MagicMock
from batch_datasets.src.main import get_active_feeds, batch_datasets
from batch_datasets.src.main import get_non_deprecated_feeds, batch_datasets
from test_utils.database_utils import get_testing_session, default_db_url


def test_get_active_feeds():
def test_get_non_deprecated_feeds():
with get_testing_session() as session:
active_feeds = get_active_feeds(session)
assert len(active_feeds) == 3
# assert all active feeds has authentication_type == '0'
for feed in active_feeds:
assert feed.authentication_type == "0"
feeds = get_non_deprecated_feeds(session)
assert len(feeds) == 10
assert len([feed for feed in feeds if feed.status == "active"]) == 3
assert len([feed for feed in feeds if feed.status == "inactive"]) == 7


@mock.patch.dict(
Expand All @@ -45,23 +44,23 @@ def test_get_active_feeds():
def test_batch_datasets(mock_client, mock_publish):
mock_client.return_value = MagicMock()
with get_testing_session() as session:
active_feeds = get_active_feeds(session)
feeds = get_non_deprecated_feeds(session)
with patch(
"dataset_service.main.BatchExecutionService.__init__", return_value=None
):
with patch(
"dataset_service.main.BatchExecutionService.save", return_value=None
):
batch_datasets(Mock())
assert mock_publish.call_count == 3
assert mock_publish.call_count == 5
# loop over mock_publish.call_args_list and check that the stable_id of the feed is in the list of
# active feeds
for i in range(3):
message = json.loads(
mock_publish.call_args_list[i][0][2].decode("utf-8")
)
assert message["feed_stable_id"] in [
feed.stable_id for feed in active_feeds
feed.stable_id for feed in feeds
]


Expand Down
1 change: 1 addition & 0 deletions scripts/function-python-run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/bin/bash
#
#
# MobilityData 2023
Expand Down

0 comments on commit ef48282

Please sign in to comment.