diff --git a/functions-python/README.md b/functions-python/README.md index 53f7abe08..c94f41a71 100644 --- a/functions-python/README.md +++ b/functions-python/README.md @@ -32,6 +32,24 @@ The function configuration file contains the following properties: - `min_instance_count`: The minimum number of function instances that can be created in response to a load. - `available_cpu_count`: The number of CPU cores that are available to the function. +# Local Setup + +## Requirements +The requirements to run the functions locally might differ depending on the Google cloud dependencies. Please refer to each function to make sure all the requirements are met. + +## Install the Google Cloud SDK +To be able to run the functions locally, the Google Cloud SDK should be installed. Please refer to the [Google Cloud SDK documentation](https://cloud.google.com/sdk/docs/install) for more information. + +## Install the Google Cloud Emulators + +```bash +gcloud components install cloud-datastore-emulator +``` + +- Install the Pub/Sub emulator +```bash +gcloud components install pubsub-emulator +``` # Useful scripts - To locally execute a function use the following command: diff --git a/functions-python/feed_sync_dispatcher_transitland/.coveragerc b/functions-python/feed_sync_dispatcher_transitland/.coveragerc new file mode 100644 index 000000000..89dac199f --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/.coveragerc @@ -0,0 +1,10 @@ +[run] +omit = + */test*/* + */database_gen/* + */dataset_service/* + */helpers/* + +[report] +exclude_lines = + if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/.env.rename_me b/functions-python/feed_sync_dispatcher_transitland/.env.rename_me new file mode 100644 index 000000000..3250ba24d --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/.env.rename_me @@ -0,0 +1,5 @@ +# Environment variables for tokens function to run locally. Delete this line after rename the file. +FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/MobilityDatabase +PROJECT_ID=my-project-id +PUBSUB_TOPIC_NAME=my-topic +TRANSITLAND_API_KEY=your-api-key diff --git a/functions-python/feed_sync_dispatcher_transitland/README.md b/functions-python/feed_sync_dispatcher_transitland/README.md new file mode 100644 index 000000000..303f1ef35 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/README.md @@ -0,0 +1,79 @@ +# Batch Datasets +This directory contains the GCP serverless function that triggers the sync feeds in transitland. +The function publish one Pub/Sub message per transitland feed to be synced. +```json + { + "message": { + "data": + { + external_id="feeds_onestop_id", + feed_id="feed_id", + execution_id=execution_id, + feed_url="feed_url", + spec="spec", + auth_info_url="auth_info_url", + auth_param_name="auth_param_name", + type="type", + operator_name="operator_name", + country="country", + state_province="state_province", + city_name="city_name", + source="TLD", + payload_type=payload_type + } + } + } +``` + +# Function configuration +The function is configured using the following environment variables: +- `PUBSUB_TOPIC`: The Pub/Sub topic to publish the messages to. +- `PROJECT_ID`: The GCP Project id. +- `TRANSITLAND_API_KEY`: The Transitland API key(secret). + +# Local development +The local development of this function follows the same steps as the other functions. + +Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information. + +## Python requirements + +- Install the requirements +```bash + pip install -r ./functions-python/feed_sync_dispatcher_transitland/requirements.txt +``` + +## Test locally with Google Cloud Emulators + +- Execute the following commands to start the emulators: +```bash + gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043' +``` + +- Create a Pub/Sub topic in the emulator: +```bash + curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland" +``` + +- Start function +```bash + export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_dispatcher_transitland +``` + +- [Optional]: Create a local subscription to print published messages: +```bash +./scripts/pubsub_message_print.sh feed-sync-transitland +``` + +- Execute function +```bash + curl http://localhost:8080 +``` + +- To run/debug from your IDE use the file `main_local_debug.py` + +# Test +- Run the tests +```bash + ./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland +``` diff --git a/functions-python/feed_sync_dispatcher_transitland/function_config.json b/functions-python/feed_sync_dispatcher_transitland/function_config.json new file mode 100644 index 000000000..99554a359 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/function_config.json @@ -0,0 +1,19 @@ +{ + "name": "feed-sync-dispatcher-transitland", + "description": "Feed Sync Dispatcher for Transitland", + "entry_point": "feed_sync_dispatcher_transitland", + "timeout": 540, + "memory": "512Mi", + "trigger_http": true, + "include_folders": ["database_gen", "helpers"], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_INTERNAL_AND_GCLB", + "max_instance_request_concurrency": 20, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 1 +} diff --git a/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py b/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py new file mode 100644 index 000000000..5cf6d7529 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py @@ -0,0 +1,26 @@ +# Code to be able to debug locally without affecting the runtime cloud function + + +# Requirements: +# - Google Cloud SDK installed +# - Make sure to have the following environment variables set in your .env.local file +# - Local database in running state +# - Follow the instructions in the README.md file +# +# Usage: +# - python feed_sync_dispatcher_transitland/main_local_debug.py + +from src.main import feed_sync_dispatcher_transitland +from dotenv import load_dotenv + +# Load environment variables from .env.local +load_dotenv(dotenv_path=".env.local_test") + +if __name__ == "__main__": + + class RequestObject: + def __init__(self, headers): + self.headers = headers + + request = RequestObject({"X-Cloud-Trace-Context": "1234567890abcdef"}) + feed_sync_dispatcher_transitland(request) diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements.txt b/functions-python/feed_sync_dispatcher_transitland/requirements.txt new file mode 100644 index 000000000..3d7b3f6ef --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/requirements.txt @@ -0,0 +1,20 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.8.30 +pandas + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +google-cloud-pubsub +cloudevents~=1.10.1 diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt b/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt new file mode 100644 index 000000000..9ee50adce --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt @@ -0,0 +1,2 @@ +Faker +pytest~=7.4.3 \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/src/__init__.py b/functions-python/feed_sync_dispatcher_transitland/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/feed_sync_dispatcher_transitland/src/main.py b/functions-python/feed_sync_dispatcher_transitland/src/main.py new file mode 100644 index 000000000..90592f725 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/src/main.py @@ -0,0 +1,358 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import logging +import time +import random +from dataclasses import dataclass, asdict +from typing import Optional, List +import requests +from requests.exceptions import RequestException, HTTPError +import pandas as pd + +import functions_framework +from google.cloud.pubsub_v1.futures import Future +from sqlalchemy.orm import Session +from sqlalchemy import text + +from helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload +from helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher +from helpers.pub_sub import get_pubsub_client, get_execution_id + +# Logging configuration +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Environment variables +PUBSUB_TOPIC_NAME = os.getenv("PUBSUB_TOPIC_NAME") +PROJECT_ID = os.getenv("PROJECT_ID") +FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL") +apikey = os.getenv("TRANSITLAND_API_KEY") +TRANSITLAND_OPERATOR_URL = os.getenv("TRANSITLAND_OPERATOR_URL") +TRANSITLAND_FEED_URL = os.getenv("TRANSITLAND_FEED_URL") +spec = ["gtfs", "gtfs-rt"] + +# session instance to reuse connections +session = requests.Session() + + +@dataclass +class TransitFeedSyncPayload: + """ + Data class for transit feed sync payloads. + """ + + external_id: str + feed_id: str + feed_url: Optional[str] = None + execution_id: Optional[str] = None + spec: Optional[str] = None + auth_info_url: Optional[str] = None + auth_param_name: Optional[str] = None + type: Optional[str] = None + operator_name: Optional[str] = None + country: Optional[str] = None + state_province: Optional[str] = None + city_name: Optional[str] = None + source: Optional[str] = None + payload_type: Optional[str] = None + + def to_dict(self): + return asdict(self) + + def to_json(self): + return json.dumps(self.to_dict()) + + +class TransitFeedSyncProcessor(FeedSyncProcessor): + def check_url_status(self, url: str) -> bool: + """ + Checks if a URL returns a valid response (not 404 or 500). + """ + try: + response = requests.head(url, timeout=25) + return response.status_code not in {404, 500} + except requests.RequestException as e: + logging.warning(f"Failed to reach {url}: {e}") + return False + + def process_sync( + self, db_session: Optional[Session] = None, execution_id: Optional[str] = None + ) -> List[FeedSyncPayload]: + """ + Process data synchronously to fetch, extract, combine, filter and prepare payloads for publishing + to a queue based on conditions related to the data retrieved from TransitLand API. + """ + feeds_data = self.get_data(TRANSITLAND_FEED_URL, apikey, spec, session) + operators_data = self.get_data( + TRANSITLAND_OPERATOR_URL, apikey, session=session + ) + + feeds = self.extract_feeds_data(feeds_data) + operators = self.extract_operators_data(operators_data) + + # Converts operators and feeds to pandas DataFrames + operators_df = pd.DataFrame(operators) + feeds_df = pd.DataFrame(feeds) + + # Merge operators and feeds DataFrames on 'operator_feed_id' and 'feed_id' + combined_df = pd.merge( + operators_df, + feeds_df, + left_on="operator_feed_id", + right_on="feed_id", + how="inner", + ) + + # Filtered out rows where 'feed_url' is missing + combined_df = combined_df[combined_df["feed_url"].notna()] + + # Group by 'feed_id' and concatenate 'operator_name' while keeping first values of other columns + df_grouped = ( + combined_df.groupby("feed_id") + .agg( + { + "operator_name": lambda x: ", ".join(x), + "feeds_onestop_id": "first", + "feed_url": "first", + "operator_feed_id": "first", + "spec": "first", + "country": "first", + "state_province": "first", + "city_name": "first", + "auth_info_url": "first", + "auth_param_name": "first", + "type": "first", + } + ) + .reset_index() + ) + + # Filtered out unwanted countries + countries_not_included = ["France", "Japan"] + filtered_df = df_grouped[ + ~df_grouped["country"] + .str.lower() + .isin([c.lower() for c in countries_not_included]) + ] + + # Filtered out URLs that return undesired status codes + filtered_df = filtered_df[filtered_df["feed_url"].apply(self.check_url_status)] + + # Convert filtered DataFrame to dictionary format + combined_data = filtered_df.to_dict(orient="records") + + payloads = [] + for data in combined_data: + external_id = data["feeds_onestop_id"] + feed_url = data["feed_url"] + source = "TLD" + + if not self.check_external_id(db_session, external_id, source): + payload_type = "new" + else: + mbd_feed_url = self.get_mbd_feed_url(db_session, external_id, source) + if mbd_feed_url != feed_url: + payload_type = "update" + else: + continue + + # prepare payload + payload = TransitFeedSyncPayload( + external_id=external_id, + feed_id=data["feed_id"], + execution_id=execution_id, + feed_url=feed_url, + spec=data["spec"], + auth_info_url=data["auth_info_url"], + auth_param_name=data["auth_param_name"], + type=data["type"], + operator_name=data["operator_name"], + country=data["country"], + state_province=data["state_province"], + city_name=data["city_name"], + source="TLD", + payload_type=payload_type, + ) + payloads.append(FeedSyncPayload(external_id=external_id, payload=payload)) + + return payloads + + def get_data( + self, + url, + apikey, + spec=None, + session=None, + max_retries=3, + initial_delay=60, + max_delay=120, + ): + """ + This function retrieves data from the specified Transitland feeds and operator endpoints. + Handles rate limits, retries, and error cases. + Returns the parsed data as a dictionary containing feeds and operators. + """ + headers = {"apikey": apikey} + params = {"spec": spec} if spec else {} + all_data = {"feeds": [], "operators": []} + delay = initial_delay + + while url: + for attempt in range(max_retries): + try: + response = session.get( + url, headers=headers, params=params, timeout=30 + ) + response.raise_for_status() + data = response.json() + all_data["feeds"].extend(data.get("feeds", [])) + all_data["operators"].extend(data.get("operators", [])) + url = data.get("meta", {}).get("next") + delay = initial_delay + break + + except (RequestException, HTTPError) as e: + logging.error("Attempt %s failed: %s", attempt + 1, e) + if response.status_code == 429: + logging.warning("Rate limit hit. Waiting for %s seconds", delay) + time.sleep(delay + random.uniform(0, 1)) + delay = min(delay * 2, max_delay) + elif attempt == max_retries - 1: + logging.error( + "Failed to fetch data after %s attempts.", max_retries + ) + return all_data + else: + time.sleep(delay) + return all_data + + def extract_feeds_data(self, feeds_data: dict) -> List[dict]: + """ + This function extracts relevant data from the Transitland feeds endpoint containing feeds information. + Returns a list of dictionaries representing each feed. + """ + feeds = [] + for feed in feeds_data["feeds"]: + feed_url = feed["urls"].get("static_current") + feeds.append( + { + "feed_id": feed["id"], + "feed_url": feed_url, + "spec": feed["spec"].lower(), + "feeds_onestop_id": feed["onestop_id"], + "auth_info_url": feed["authorization"].get("info_url"), + "auth_param_name": feed["authorization"].get("param_name"), + "type": feed["authorization"].get("type"), + } + ) + return feeds + + def extract_operators_data(self, operators_data: dict) -> List[dict]: + """ + This function extracts relevant data from the Transitland operators endpoint. + Constructs a list of dictionaries containing information about each operator. + """ + operators = [] + for operator in operators_data["operators"]: + if operator.get("agencies") and operator["agencies"][0].get("places"): + places = operator["agencies"][0]["places"] + place = places[1] if len(places) > 1 else places[0] + + operator_data = { + "operator_name": operator.get("name"), + "operator_feed_id": operator["feeds"][0]["id"] + if operator.get("feeds") + else None, + "country": place.get("adm0_name") if place else None, + "state_province": place.get("adm1_name") if place else None, + "city_name": place.get("city_name") if place else None, + } + operators.append(operator_data) + return operators + + def check_external_id( + self, db_session: Session, external_id: str, source: str + ) -> bool: + """ + Checks if the external_id exists in the public.externalid table for the given source. + :param db_session: SQLAlchemy session + :param external_id: The external_id (feeds_onestop_id) to check + :param source: The source to filter by (e.g., 'TLD' for TransitLand) + :return: True if the feed exists, False otherwise + """ + query = text( + "SELECT 1 FROM public.externalid WHERE associated_id = :external_id AND source = :source LIMIT 1" + ) + result = db_session.execute( + query, {"external_id": external_id, "source": source} + ).fetchone() + return result is not None + + def get_mbd_feed_url( + self, db_session: Session, external_id: str, source: str + ) -> Optional[str]: + """ + Retrieves the feed_url from the public.feed table in the mbd for the given external_id. + :param db_session: SQLAlchemy session + :param external_id: The external_id (feeds_onestop_id) from TransitLand + :param source: The source to filter by (e.g., 'TLD' for TransitLand) + :return: feed_url in mbd if exists, otherwise None + """ + query = text( + """ + SELECT f.producer_url + FROM public.feed f + JOIN public.externalid e ON f.id = e.feed_id + WHERE e.associated_id = :external_id AND e.source = :source + LIMIT 1 + """ + ) + result = db_session.execute( + query, {"external_id": external_id, "source": source} + ).fetchone() + return result[0] if result else None + + def publish_callback( + self, future: Future, payload: FeedSyncPayload, topic_path: str + ): + """ + Callback function for when the message is published to Pub/Sub. + This function logs the result of the publishing operation. + """ + if future.exception(): + print( + f"Error publishing transit land feed {payload.external_id} " + f"to Pub/Sub topic {topic_path}: {future.exception()}" + ) + else: + print(f"Published transit land feed {payload.external_id}.") + + +@functions_framework.http +def feed_sync_dispatcher_transitland(request): + """ + HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed. + """ + publisher = get_pubsub_client() + topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME) + transit_land_feed_sync_processor = TransitFeedSyncProcessor() + execution_id = get_execution_id(request, "feed-sync-dispatcher") + feed_sync_dispatcher(transit_land_feed_sync_processor, topic_path, execution_id) + return "Feed sync dispatcher executed successfully." diff --git a/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py b/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py new file mode 100644 index 000000000..fb27bbb2e --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py @@ -0,0 +1,298 @@ +import pytest +from unittest.mock import Mock, patch +from requests import Session as RequestsSession +from sqlalchemy.orm import Session as DBSession +from feed_sync_dispatcher_transitland.src.main import TransitFeedSyncProcessor +import pandas as pd + + +@pytest.fixture +def processor(): + return TransitFeedSyncProcessor() + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.Session.get") +def test_get_data(mock_get, processor): + mock_response = Mock() + mock_response.json.return_value = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + mock_response.status_code = 200 + mock_get.return_value = mock_response + + result = processor.get_data( + "https://api.transit.land", "dummy_api_key", session=RequestsSession() + ) + assert "feeds" in result + assert result["feeds"][0]["id"] == "feed1" + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.Session.get") +def test_get_data_rate_limit(mock_get, processor): + mock_response = Mock() + mock_response.status_code = 429 + mock_response.json.return_value = {"feeds": [], "operators": []} + mock_get.return_value = mock_response + + result = processor.get_data( + "https://api.transit.land", + "dummy_api_key", + session=RequestsSession(), + max_retries=1, + ) + assert result == {"feeds": [], "operators": []} + + +def test_extract_feeds_data(processor): + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ] + } + result = processor.extract_feeds_data(feeds_data) + assert len(result) == 1 + assert result[0]["feed_id"] == "feed1" + + +def test_extract_operators_data(processor): + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ] + } + result = processor.extract_operators_data(operators_data) + assert len(result) == 1 + assert result[0]["operator_name"] == "Operator 1" + + +def test_check_external_id(processor): + mock_db_session = Mock(spec=DBSession) + mock_db_session.execute.return_value.fetchone.return_value = (1,) + result = processor.check_external_id(mock_db_session, "onestop1", "TLD") + assert result is True + + mock_db_session.execute.return_value.fetchone.return_value = None + result = processor.check_external_id(mock_db_session, "onestop2", "TLD") + assert result is False + + +def test_get_mbd_feed_url(processor): + mock_db_session = Mock(spec=DBSession) + mock_db_session.execute.return_value.fetchone.return_value = ( + "http://example.com/feed1", + ) + result = processor.get_mbd_feed_url(mock_db_session, "onestop1", "TLD") + assert result == "http://example.com/feed1" + + mock_db_session.execute.return_value.fetchone.return_value = None + result = processor.get_mbd_feed_url(mock_db_session, "onestop2", "TLD") + assert result is None + + +def test_process_sync_new_feed(processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + + processor.get_data = Mock(side_effect=[feeds_data, operators_data]) + + processor.check_url_status = Mock(return_value=True) + + with patch.object(processor, "check_external_id", return_value=False): + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + assert len(payloads) == 1 + assert payloads[0].payload.payload_type == "new" + assert payloads[0].payload.external_id == "onestop1" + + +def test_process_sync_updated_feed(processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1_updated"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + + processor.get_data = Mock(side_effect=[feeds_data, operators_data]) + + processor.check_url_status = Mock(return_value=True) + + processor.check_external_id = Mock(return_value=True) + + processor.get_mbd_feed_url = Mock(return_value="http://example.com/feed1") + + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + + assert len(payloads) == 1 + assert payloads[0].payload.payload_type == "update" + assert payloads[0].payload.external_id == "onestop1" + + +@patch("feed_sync_dispatcher_transitland.src.main.TransitFeedSyncProcessor.get_data") +def test_process_sync_unchanged_feed(mock_get_data, processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + mock_get_data.side_effect = [feeds_data, operators_data] + + with patch.object(processor, "check_external_id", return_value=True), patch.object( + processor, "get_mbd_feed_url", return_value="http://example.com/feed1" + ): + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + assert ( + len(payloads) == 0 + ) # No payload should be created since feed hasn't changed + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.head") +def test_check_url_status(mock_head, processor): + mock_head.return_value.status_code = 200 + result = processor.check_url_status("http://example.com") + assert result is True + + mock_head.return_value.status_code = 404 + result = processor.check_url_status("http://example.com") + assert result is False + + +def test_merge_and_filter_dataframes(processor): + operators = [ + { + "operator_name": "Operator 1", + "operator_feed_id": "feed1", + "country": "USA", + "state_province": "CA", + "city_name": "San Francisco", + }, + { + "operator_name": "Operator 2", + "operator_feed_id": "feed2", + "country": "Japan", + "state_province": "Tokyo", + "city_name": "Tokyo", + }, + ] + feeds = [ + { + "feed_id": "feed1", + "feed_url": "http://example.com/feed1", + "spec": "gtfs", + "feeds_onestop_id": "onestop1", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + }, + { + "feed_id": "feed2", + "feed_url": "http://example.com/feed2", + "spec": "gtfs", + "feeds_onestop_id": "onestop2", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + }, + ] + + operators_df = pd.DataFrame(operators) + feeds_df = pd.DataFrame(feeds) + + combined_df = pd.merge( + operators_df, + feeds_df, + left_on="operator_feed_id", + right_on="feed_id", + how="inner", + ) + combined_df = combined_df[combined_df["feed_url"].notna()] + countries_not_included = ["France", "Japan"] + filtered_df = combined_df[ + ~combined_df["country"] + .str.lower() + .isin([c.lower() for c in countries_not_included]) + ] + + assert len(filtered_df) == 1 + assert filtered_df.iloc[0]["operator_name"] == "Operator 1" + assert filtered_df.iloc[0]["feed_id"] == "feed1" diff --git a/functions-python/helpers/feed_sync/feed_sync_common.py b/functions-python/helpers/feed_sync/feed_sync_common.py new file mode 100644 index 000000000..a738f3b55 --- /dev/null +++ b/functions-python/helpers/feed_sync/feed_sync_common.py @@ -0,0 +1,59 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dataclasses import dataclass +from typing import Any + +from google.cloud.pubsub_v1.publisher.futures import Future +from sqlalchemy.orm import Session + + +@dataclass +class FeedSyncPayload: + """ + Data class for feed sync payloads. + """ + + external_id: str + payload: Any + + +class FeedSyncProcessor: + """ + Abstract class for feed sync processors + """ + + def process_sync( + self, session: Session, execution_id: str + ) -> list[FeedSyncPayload]: + """ + Abstract method to process feed sync. + :param session: database session + :param execution_id: execution ID. This ID is used for logging and debugging purposes. + :return: list of FeedSyncPayload + """ + pass + + def publish_callback( + self, future: Future, payload: FeedSyncPayload, topic_path: str + ): + """ + Abstract method for publishing callback. + :param future: Future object + :param payload: FeedSyncPayload object + :param topic_path: Pub/Sub topic path + """ + pass diff --git a/functions-python/helpers/feed_sync/feed_sync_dispatcher.py b/functions-python/helpers/feed_sync/feed_sync_dispatcher.py new file mode 100644 index 000000000..bb296968b --- /dev/null +++ b/functions-python/helpers/feed_sync/feed_sync_dispatcher.py @@ -0,0 +1,60 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import logging + +from helpers.database import start_db_session, close_db_session +from helpers.feed_sync.feed_sync_common import FeedSyncProcessor +from helpers.pub_sub import get_pubsub_client, publish + + +def feed_sync_dispatcher( + feed_sync_processor: FeedSyncProcessor, pubsub_topic_path: str, execution_id: str +): + """ + HTTP Function to process APIs feed syncs and publishes events to a Pub/Sub topic to be processed. + :param pubsub_topic_path: name of the Pub/Sub topic to publish to + :param execution_id: execution ID + :param feed_sync_processor: FeedSync object + :return: HTTP response object + """ + publisher = get_pubsub_client() + try: + session = start_db_session(os.getenv("FEEDS_DATABASE_URL")) + payloads = feed_sync_processor.process_sync(session, execution_id) + except Exception as error: + logging.error(f"Error processing feeds sync: {error}") + raise Exception(f"Error processing feeds sync: {error}") + finally: + close_db_session(session) + + logging.info(f"Total feeds to add/update: {len(payloads)}.") + + for payload in payloads: + data_str = json.dumps(payload.payload.__dict__) + print(f"Publishing {data_str} to {pubsub_topic_path}.") + future = publish(publisher, pubsub_topic_path, data_str.encode("utf-8")) + future.add_done_callback( + lambda _: feed_sync_processor.publish_callback( + future, payload, pubsub_topic_path + ) + ) + + logging.info( + f"Publish completed. Published {len(payloads)} feeds to {pubsub_topic_path}." + ) diff --git a/functions-python/helpers/pub_sub.py b/functions-python/helpers/pub_sub.py new file mode 100644 index 000000000..76184b947 --- /dev/null +++ b/functions-python/helpers/pub_sub.py @@ -0,0 +1,45 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import uuid + +from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1 import PublisherClient +from google.cloud.pubsub_v1.publisher.futures import Future + + +def get_pubsub_client(): + """ + Returns a Pub/Sub client. + """ + return pubsub_v1.PublisherClient() + + +def publish(publisher: PublisherClient, topic_path: str, data_bytes: bytes) -> Future: + """ + Publishes the given data to the Pub/Sub topic. + """ + return publisher.publish(topic_path, data=data_bytes) + + +def get_execution_id(request, prefix: str) -> str: + """ + Returns the execution ID for the request if available, otherwise generates a new one. + @param request: HTTP request object + @param prefix: prefix for the execution ID. Example: "batch-datasets" + """ + trace_id = request.headers.get("X-Cloud-Trace-Context") + execution_id = f"{prefix}-{trace_id}" if trace_id else f"{prefix}-{uuid.uuid4()}" + return execution_id diff --git a/functions-python/helpers/requirements.txt b/functions-python/helpers/requirements.txt index 0c58ed566..ae500c0b2 100644 --- a/functions-python/helpers/requirements.txt +++ b/functions-python/helpers/requirements.txt @@ -1,18 +1,25 @@ +# Common packages functions-framework==3.* -google-cloud-storage -google-cloud-pubsub google-cloud-logging -google-cloud-bigquery -google-api-core -google-cloud-firestore -google-cloud-datastore -google-cloud-bigquery psycopg2-binary==2.9.6 -aiohttp -asyncio -urllib3~=2.1.0 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.7.4 + +# SQL Alchemy and Geo Alchemy SQLAlchemy==2.0.23 geoalchemy2==0.14.7 -requests~=2.31.0 + +# Google specific packages for this function +google-cloud-pubsub +google-cloud-storage +google-cloud-datastore cloudevents~=1.10.1 -requests_mock \ No newline at end of file +google-cloud-bigquery +google-api-core +google-cloud-firestore +google-cloud-bigquery \ No newline at end of file diff --git a/scripts/pubsub_message_print.sh b/scripts/pubsub_message_print.sh new file mode 100755 index 000000000..23196d7ca --- /dev/null +++ b/scripts/pubsub_message_print.sh @@ -0,0 +1,126 @@ +#!/bin/bash + +# +# MobilityData 2023 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + + +# This function uses the Pub/Sub emulator to create a subscription and count the number of messages in a topic. +# If you are debbuging locally with a consumer of the topic, +# you should not be running this script if the consumer shares the same subscription name. +# As this script and any consumer sharing the same subscription will be competing for the messages in the topic. +# If consumers have different subscription names, they will each receive a copy of the messages. +# +# Requires the jq command-line JSON processor: https://stedolan.github.io/jq/ +# +# Usage: ./pubsub_message_print.sh + + + +export PUBSUB_EMULATOR_HOST="localhost:8043" + + +PROJECT="test-project" +SUBSCRIPTION_NAME="my-subscription" + + +TOPIC_NAME="$1" + + +if [ -z "$TOPIC_NAME" ]; then + echo "Error: No topic name provided." + echo "Usage: ./pubsub_message_count.sh " + exit 1 +fi + + +create_subscription() { + echo "Creating subscription: $SUBSCRIPTION_NAME" + SUBSCRIPTION_URL="http://$PUBSUB_EMULATOR_HOST/v1/projects/$PROJECT/subscriptions/$SUBSCRIPTION_NAME" + TOPIC_URL="projects/$PROJECT/topics/$TOPIC_NAME" + + BODY=$(cat <