diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index 5463226..2e914e8 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -22,20 +22,42 @@ jobs: uses: actions/checkout@v4 - name: Install Nix - uses: cachix/install-nix-action@v24 + uses: cachix/install-nix-action@v25 with: nix_path: nixpkgs=channel:nixpkgs-23.11-darwin - extra_nix_config: | - trusted-public-keys = cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY= - substituters = https://cache.nixos.org/ + + - name: Extract nixpkgs hash + run: | + nixpkgs_hash=$(grep -o -P '(?<=archive/)[0-9a-f]{40}(?=\.tar\.gz)' shell.nix) + echo "NIXPKGS_HASH=$nixpkgs_hash" >> $GITHUB_ENV + + - name: Cache Nix store + uses: actions/cache@v4 + id: nix-cache + with: + key: nix-${{ runner.os }}-${{ env.NIXPKGS_HASH }} + path: /tmp/nix-cache + + - name: Import Nix store cache + if: steps.nix-cache.outputs.cache-hit == 'true' + run: | + nix-store --import < /tmp/nix-cache + + - name: Cache Python packages + uses: actions/cache@v4 + with: + key: python-${{ runner.os }}-${{ hashFiles('poetry.lock') }} + path: .venv - name: Install dependencies run: | nix-shell --pure --run true + nix-shell --pure --arg isDevelopment false --run true - - name: Build Cython modules + - name: Export Nix store cache + if: steps.nix-cache.outputs.cache-hit != 'true' run: | - nix-shell --pure --run cython-build + nix-store --export $(find /nix/store -maxdepth 1 -name '*-*') > /tmp/nix-cache - name: Build container image run: | diff --git a/api/v1/api.py b/api/v1/__init__.py similarity index 89% rename from api/v1/api.py rename to api/v1/__init__.py index 312a85e..4c23146 100644 --- a/api/v1/api.py +++ b/api/v1/__init__.py @@ -5,7 +5,7 @@ import api.v1.photos as photos import api.v1.tile as tile -router = APIRouter() +router = APIRouter(prefix='/api/v1') router.include_router(countries.router) router.include_router(node.router) router.include_router(photos.router) diff --git a/api/v1/countries.py b/api/v1/countries.py index b340c12..2725c33 100644 --- a/api/v1/countries.py +++ b/api/v1/countries.py @@ -1,44 +1,35 @@ from datetime import timedelta from typing import Annotated -import anyio -from anyio.streams.memory import MemoryObjectSendStream +from anyio import create_task_group from fastapi import APIRouter, Path, Response +from sentry_sdk import start_span from shapely.geometry import mapping from middlewares.cache_middleware import configure_cache from models.country import Country -from states.aed_state import AEDState, AEDStateDep -from states.country_state import CountryStateDep +from states.aed_state import AEDState +from states.country_state import CountryState router = APIRouter(prefix='/countries') -async def _count_aed_in_country(country: Country, aed_state: AEDState, send_stream: MemoryObjectSendStream) -> None: - count = await aed_state.count_aeds_by_country_code(country.code) - await send_stream.send((country, count)) - - @router.get('/names') @configure_cache(timedelta(hours=1), stale=timedelta(days=7)) -async def get_names( - country_state: CountryStateDep, - aed_state: AEDStateDep, - language: str | None = None, -): - countries = await country_state.get_all_countries() +async def get_names(language: str | None = None): + countries = await CountryState.get_all_countries() + country_count_map: dict[str, int] = {} - send_stream, receive_stream = anyio.create_memory_object_stream() - country_count_map = {} + with start_span(description='Counting AEDs'): - async with anyio.create_task_group() as tg, send_stream, receive_stream: - for country in countries: - tg.start_soon(_count_aed_in_country, country, aed_state, send_stream) - - for _ in range(len(countries)): - country, count = await receive_stream.receive() + async def count_task(country: Country) -> None: + count = await AEDState.count_aeds_by_country_code(country.code) country_count_map[country.name] = count + async with create_task_group() as tg: + for country in countries: + tg.start_soon(count_task, country) + def limit_country_names(names: dict[str, str]): if language and (name := names.get(language)): return {language: name} @@ -67,13 +58,11 @@ def limit_country_names(names: dict[str, str]): async def get_geojson( response: Response, country_code: Annotated[str, Path(min_length=2, max_length=5)], - country_state: CountryStateDep, - aed_state: AEDStateDep, ): if country_code == 'WORLD': - aeds = await aed_state.get_all_aeds() + aeds = await AEDState.get_all_aeds() else: - aeds = await aed_state.get_aeds_by_country_code(country_code) + aeds = await AEDState.get_aeds_by_country_code(country_code) response.headers['Content-Disposition'] = 'attachment' response.headers['Content-Type'] = 'application/geo+json; charset=utf-8' @@ -82,7 +71,7 @@ async def get_geojson( 'features': [ { 'type': 'Feature', - 'geometry': mapping(aed.position.shapely), + 'geometry': mapping(aed.position), 'properties': { '@osm_type': 'node', '@osm_id': aed.id, diff --git a/api/v1/node.py b/api/v1/node.py index f37515f..2738658 100644 --- a/api/v1/node.py +++ b/api/v1/node.py @@ -4,12 +4,12 @@ from fastapi import APIRouter, HTTPException from pytz import timezone +from shapely import Point from tzfpy import get_tz from middlewares.cache_middleware import configure_cache -from models.lonlat import LonLat -from states.aed_state import AEDStateDep -from states.photo_state import PhotoStateDep +from states.aed_state import AEDState +from states.photo_state import PhotoState from utils import get_wikimedia_commons_url router = APIRouter() @@ -17,8 +17,8 @@ photo_id_re = re.compile(r'view/(?P\S+)\.') -def _get_timezone(lonlat: LonLat) -> tuple[str | None, str | None]: - timezone_name: str | None = get_tz(lonlat.lon, lonlat.lat) +def _get_timezone(position: Point) -> tuple[str | None, str | None]: + timezone_name: str | None = get_tz(position.x, position.y) timezone_offset = None if timezone_name: @@ -32,14 +32,14 @@ def _get_timezone(lonlat: LonLat) -> tuple[str | None, str | None]: return timezone_name, timezone_offset -async def _get_image_data(tags: dict[str, str], photo_state: PhotoStateDep) -> dict: +async def _get_image_data(tags: dict[str, str]) -> dict: image_url: str = tags.get('image', '') if ( image_url and (photo_id_match := photo_id_re.search(image_url)) and (photo_id := photo_id_match.group('id')) - and (photo_info := await photo_state.get_photo_by_id(photo_id)) + and (photo_info := await PhotoState.get_photo_by_id(photo_id)) ): return { '@photo_id': photo_info.id, @@ -72,13 +72,13 @@ async def _get_image_data(tags: dict[str, str], photo_state: PhotoStateDep) -> d @router.get('/node/{node_id}') @configure_cache(timedelta(minutes=1), stale=timedelta(minutes=5)) -async def get_node(node_id: int, aed_state: AEDStateDep, photo_state: PhotoStateDep): - aed = await aed_state.get_aed_by_id(node_id) +async def get_node(node_id: int): + aed = await AEDState.get_aed_by_id(node_id) if aed is None: raise HTTPException(404, f'Node {node_id} not found') - photo_dict = await _get_image_data(aed.tags, photo_state) + photo_dict = await _get_image_data(aed.tags) timezone_name, timezone_offset = _get_timezone(aed.position) timezone_dict = { @@ -97,8 +97,8 @@ async def get_node(node_id: int, aed_state: AEDStateDep, photo_state: PhotoState **timezone_dict, 'type': 'node', 'id': aed.id, - 'lat': aed.position.lat, - 'lon': aed.position.lon, + 'lat': aed.position.y, + 'lon': aed.position.x, 'tags': aed.tags, 'version': aed.version, } diff --git a/api/v1/photos.py b/api/v1/photos.py index f46f9c6..309f8d7 100644 --- a/api/v1/photos.py +++ b/api/v1/photos.py @@ -14,9 +14,9 @@ from middlewares.cache_middleware import configure_cache from openstreetmap import OpenStreetMap, osm_user_has_active_block from osm_change import update_node_tags_osm_change -from states.aed_state import AEDStateDep -from states.photo_report_state import PhotoReportStateDep -from states.photo_state import PhotoStateDep +from states.aed_state import AEDState +from states.photo_report_state import PhotoReportState +from states.photo_state import PhotoState from utils import get_http_client, get_wikimedia_commons_url router = APIRouter(prefix='/photos') @@ -51,8 +51,8 @@ async def _fetch_image(url: str) -> tuple[bytes, str]: @router.get('/view/{id}.webp') @configure_cache(timedelta(days=365), stale=timedelta(days=365)) -async def view(id: str, photo_state: PhotoStateDep) -> FileResponse: - info = await photo_state.get_photo_by_id(id) +async def view(id: str) -> FileResponse: + info = await PhotoState.get_photo_by_id(id) if info is None: raise HTTPException(404, f'Photo {id!r} not found') @@ -94,8 +94,6 @@ async def upload( file_license: Annotated[str, Form()], file: Annotated[UploadFile, File()], oauth2_credentials: Annotated[str, Form()], - aed_state: AEDStateDep, - photo_state: PhotoStateDep, ) -> bool: file_license = file_license.upper() accept_licenses = ('CC0',) @@ -117,7 +115,7 @@ async def upload( if 'access_token' not in oauth2_credentials_: raise HTTPException(400, 'OAuth2 credentials must contain an access_token field') - aed = await aed_state.get_aed_by_id(node_id) + aed = await AEDState.get_aed_by_id(node_id) if aed is None: raise HTTPException(404, f'Node {node_id} not found, perhaps it is not an AED?') @@ -129,7 +127,7 @@ async def upload( if osm_user_has_active_block(osm_user): raise HTTPException(403, 'User has an active block on OpenStreetMap') - photo_info = await photo_state.set_photo(node_id, osm_user['id'], file) + photo_info = await PhotoState.set_photo(node_id, osm_user['id'], file) photo_url = f'{request.base_url}api/v1/photos/view/{photo_info.id}.webp' node_xml = await osm.get_node_xml(node_id) @@ -147,26 +145,19 @@ async def upload( @router.post('/report') -async def report( - id: Annotated[str, Form()], - photo_report_state: PhotoReportStateDep, -) -> bool: - return await photo_report_state.report_by_photo_id(id) +async def report(id: Annotated[str, Form()]) -> bool: + return await PhotoReportState.report_by_photo_id(id) @router.get('/report/rss.xml') -async def report_rss( - request: Request, - photo_state: PhotoStateDep, - photo_report_state: PhotoReportStateDep, -) -> Response: +async def report_rss(request: Request) -> Response: fg = FeedGenerator() fg.title('AED Photo Reports') fg.description('This feed contains a list of recent AED photo reports') fg.link(href=str(request.url), rel='self') - for report in await photo_report_state.get_recent_reports(): - info = await photo_state.get_photo_by_id(report.photo_id) + for report in await PhotoReportState.get_recent_reports(): + info = await PhotoState.get_photo_by_id(report.photo_id) if info is None: continue diff --git a/api/v1/tile.py b/api/v1/tile.py index b688a9b..031deac 100644 --- a/api/v1/tile.py +++ b/api/v1/tile.py @@ -1,12 +1,14 @@ from collections.abc import Sequence from itertools import chain +from math import atan, degrees, pi, sinh from typing import Annotated -import anyio import mapbox_vector_tile as mvt import numpy as np -from anyio.streams.memory import MemoryObjectSendStream +from anyio import create_task_group from fastapi import APIRouter, Path, Response +from sentry_sdk import start_span, trace +from shapely import Point from shapely.ops import transform from config import ( @@ -20,68 +22,98 @@ TILE_MAX_Z, TILE_MIN_Z, ) -from cython_lib.geo_utils import tile_to_bbox from middlewares.cache_middleware import make_cache_control from models.aed import AED from models.bbox import BBox from models.country import Country -from states.aed_state import AEDState, AEDStateDep -from states.country_state import CountryState, CountryStateDep -from utils import abbreviate, print_run_time +from states.aed_state import AEDState +from states.country_state import CountryState +from utils import abbreviate router = APIRouter() -async def _count_aed_in_country(country: Country, aed_state: AEDState, send_stream: MemoryObjectSendStream) -> None: - count = await aed_state.count_aeds_by_country_code(country.code) - await send_stream.send((country, count)) +def _tile_to_point(z: int, x: int, y: int) -> Point: + n = 2**z + lon_deg = x / n * 360.0 - 180.0 + lat_rad = atan(sinh(pi * (1 - 2 * y / n))) + lat_deg = degrees(lat_rad) + return Point(lon_deg, lat_deg) -def _mvt_rescale(x, y, x_min: float, y_min: float, x_span: float, y_span: float) -> tuple: +def _tile_to_bbox(z: int, x: int, y: int) -> BBox: + p1 = _tile_to_point(z, x, y + 1) + p2 = _tile_to_point(z, x + 1, y) + return BBox(p1, p2) + + +@router.get('/tile/{z}/{x}/{y}.mvt') +async def get_tile( + z: Annotated[int, Path(ge=TILE_MIN_Z, le=TILE_MAX_Z)], + x: Annotated[int, Path(ge=0)], + y: Annotated[int, Path(ge=0)], +): + bbox = _tile_to_bbox(z, x, y) + + if z <= TILE_COUNTRIES_MAX_Z: + content = await _get_tile_country(z, bbox) + headers = {'Cache-Control': make_cache_control(TILE_COUNTRIES_CACHE_MAX_AGE, TILE_COUNTRIES_CACHE_STALE)} + else: + content = await _get_tile_aed(z, bbox) + headers = {'Cache-Control': make_cache_control(DEFAULT_CACHE_MAX_AGE, TILE_AEDS_CACHE_STALE)} + + return Response(content, headers=headers, media_type='application/vnd.mapbox-vector-tile') + + +def _mvt_rescale(x: float, y: float, x_min: float, y_min: float, x_span: float, y_span: float) -> tuple[int, int]: x_mvt, y_mvt = MVT_TRANSFORMER.transform(np.array(x), np.array(y)) # subtract minimum boundary and scale to MVT extent x_scaled = np.rint((x_mvt - x_min) / x_span * MVT_EXTENT).astype(int) y_scaled = np.rint((y_mvt - y_min) / y_span * MVT_EXTENT).astype(int) - return x_scaled, y_scaled def _mvt_encode(bbox: BBox, data: Sequence[dict]) -> bytes: - x_min, y_min = MVT_TRANSFORMER.transform(*bbox.p1) - x_max, y_max = MVT_TRANSFORMER.transform(*bbox.p2) + x_min, y_min = MVT_TRANSFORMER.transform(bbox.p1.x, bbox.p1.y) + x_max, y_max = MVT_TRANSFORMER.transform(bbox.p2.x, bbox.p2.y) x_span = x_max - x_min y_span = y_max - y_min - with print_run_time('Transforming MVT geometry'): + with start_span(description='Transforming MVT geometry'): for feature in chain.from_iterable(d['features'] for d in data): feature['geometry'] = transform( func=lambda x, y: _mvt_rescale(x, y, x_min, y_min, x_span, y_span), geom=feature['geometry'], ) - with print_run_time('Encoding MVT'): - return mvt.encode(data, default_options={'extents': MVT_EXTENT}) + with start_span(description='Encoding MVT'): + return mvt.encode( + data, + default_options={ + 'extents': MVT_EXTENT, + 'check_winding_order': False, + }, + ) -async def _get_tile_country(z: int, bbox: BBox, country_state: CountryState, aed_state: AEDState) -> bytes: - with print_run_time('Querying countries'): - countries = await country_state.get_countries_within(bbox) +@trace +async def _get_tile_country(z: int, bbox: BBox) -> bytes: + countries = await CountryState.get_countries_within(bbox) + country_count_map: dict[str, str] = {} - simplify_tol = 0.5 / 2**z if z < TILE_MAX_Z else None - geometries = (country.geometry.simplify(simplify_tol, preserve_topology=False) for country in countries) + with start_span(description='Counting AEDs'): - send_stream, receive_stream = anyio.create_memory_object_stream() - country_count_map = {} + async def count_task(country: Country) -> None: + count = await AEDState.count_aeds_by_country_code(country.code) + country_count_map[country.name] = (count, abbreviate(count)) - with print_run_time('Counting AEDs'): - async with anyio.create_task_group() as tg, send_stream, receive_stream: + async with create_task_group() as tg: for country in countries: - tg.start_soon(_count_aed_in_country, country, aed_state, send_stream) + tg.start_soon(count_task, country) - for _ in range(len(countries)): - country, count = await receive_stream.receive() - country_count_map[country.name] = (count, abbreviate(count)) + simplify_tol = 0.5 / 2**z if z < TILE_MAX_Z else None + geometries = (country.geometry.simplify(simplify_tol, preserve_topology=False) for country in countries) return _mvt_encode( bbox, @@ -100,7 +132,7 @@ async def _get_tile_country(z: int, bbox: BBox, country_state: CountryState, aed 'name': 'defibrillators', 'features': [ { - 'geometry': country.label.position.shapely, + 'geometry': country.label_position, 'properties': { 'country_name': country.name, 'country_code': country.code, @@ -115,9 +147,10 @@ async def _get_tile_country(z: int, bbox: BBox, country_state: CountryState, aed ) -async def _get_tile_aed(z: int, bbox: BBox, aed_state: AEDState) -> bytes: - group_eps = 9.8 / 2**z if z < TILE_MAX_Z else None - aeds = await aed_state.get_aeds_within(bbox.extend(0.5), group_eps) +@trace +async def _get_tile_aed(z: int, bbox: BBox) -> bytes: + group_eps = 9.6 / 2**z if z < TILE_MAX_Z else None + aeds = await AEDState.get_aeds_within_bbox(bbox.extend(0.5), group_eps) return _mvt_encode( bbox, @@ -126,7 +159,7 @@ async def _get_tile_aed(z: int, bbox: BBox, aed_state: AEDState) -> bytes: 'name': 'defibrillators', 'features': [ { - 'geometry': aed.position.shapely, + 'geometry': aed.position, 'properties': { 'node_id': aed.id, 'access': aed.access, @@ -134,7 +167,7 @@ async def _get_tile_aed(z: int, bbox: BBox, aed_state: AEDState) -> bytes: } if isinstance(aed, AED) else { - 'geometry': aed.position.shapely, + 'geometry': aed.position, 'properties': { 'point_count': aed.count, 'point_count_abbreviated': abbreviate(aed.count), @@ -146,25 +179,3 @@ async def _get_tile_aed(z: int, bbox: BBox, aed_state: AEDState) -> bytes: } ], ) - - -@router.get('/tile/{z}/{x}/{y}.mvt') -async def get_tile( - z: Annotated[int, Path(ge=TILE_MIN_Z, le=TILE_MAX_Z)], - x: Annotated[int, Path(ge=0)], - y: Annotated[int, Path(ge=0)], - country_state: CountryStateDep, - aed_state: AEDStateDep, -): - bbox = tile_to_bbox(z, x, y) - assert bbox.p1.lon <= bbox.p2.lon, f'{bbox.p1.lon=} <= {bbox.p2.lon=}' - assert bbox.p1.lat <= bbox.p2.lat, f'{bbox.p1.lat=} <= {bbox.p2.lat=}' - - if z <= TILE_COUNTRIES_MAX_Z: - content = await _get_tile_country(z, bbox, country_state, aed_state) - headers = {'Cache-Control': make_cache_control(TILE_COUNTRIES_CACHE_MAX_AGE, TILE_COUNTRIES_CACHE_STALE)} - else: - content = await _get_tile_aed(z, bbox, aed_state) - headers = {'Cache-Control': make_cache_control(DEFAULT_CACHE_MAX_AGE, TILE_AEDS_CACHE_STALE)} - - return Response(content, headers=headers, media_type='application/vnd.mapbox-vector-tile') diff --git a/config.py b/config.py index 9136c6d..28af605 100644 --- a/config.py +++ b/config.py @@ -2,20 +2,38 @@ from datetime import timedelta import pymongo +import sentry_sdk from anyio import Path from motor.core import AgnosticDatabase from motor.motor_asyncio import AsyncIOMotorClient from pymongo import IndexModel from pyproj import Transformer +from sentry_sdk.integrations.asyncio import AsyncioIntegration +from sentry_sdk.integrations.pymongo import PyMongoIntegration NAME = 'openaedmap-backend' -VERSION = '2.6.3' +VERSION = '2.7.3' CREATED_BY = f'{NAME} {VERSION}' WEBSITE = 'https://openaedmap.org' ENVIRONMENT = os.getenv('ENVIRONMENT', None) USER_AGENT = f'{NAME}/{VERSION} (+{WEBSITE})' +if ENVIRONMENT: + sentry_sdk.init( + dsn='https://40b1753c3f72721489ca0bca38bb4566@sentry.monicz.dev/3', + release=VERSION, + environment=ENVIRONMENT, + enable_tracing=True, + traces_sample_rate=0.2, + trace_propagation_targets=None, + profiles_sample_rate=0.2, + integrations=[ + AsyncioIntegration(), + PyMongoIntegration(), + ], + ) + OVERPASS_API_URL = 'https://overpass-api.de/api/interpreter' OPENSTREETMAP_API_URL = os.getenv('OPENSTREETMAP_API_URL', 'https://api.openstreetmap.org/api/0.6/') REPLICATION_URL = 'https://planet.openstreetmap.org/replication/minute/' diff --git a/cython_lib/geo_utils.py b/cython_lib/geo_utils.py deleted file mode 100644 index 6d2eaba..0000000 --- a/cython_lib/geo_utils.py +++ /dev/null @@ -1,33 +0,0 @@ -import cython - -from models.bbox import BBox -from models.lonlat import LonLat - -if cython.compiled: - from cython.cimports.libc.math import atan, pi, sinh - - print(f'{__name__}: 🐇 compiled') -else: - from math import atan, pi, sinh - - print(f'{__name__}: 🐌 not compiled') - - -@cython.cfunc -def _degrees(radians: cython.double) -> cython.double: - return radians * 180 / pi - - -@cython.cfunc -def _tile_to_lonlat(z: cython.int, x: cython.int, y: cython.int) -> tuple[cython.double, cython.double]: - n: cython.double = 2**z - lon_deg: cython.double = x / n * 360.0 - 180.0 - lat_rad: cython.double = atan(sinh(pi * (1 - 2 * y / n))) - lat_deg: cython.double = _degrees(lat_rad) - return lon_deg, lat_deg - - -def tile_to_bbox(z: cython.int, x: cython.int, y: cython.int) -> BBox: - p1_lon, p1_lat = _tile_to_lonlat(z, x, y) - p2_lon, p2_lat = _tile_to_lonlat(z, x + 1, y + 1) - return BBox(LonLat(p1_lon, p2_lat), LonLat(p2_lon, p1_lat)) diff --git a/default.nix b/default.nix index 5643c71..b991f35 100644 --- a/default.nix +++ b/default.nix @@ -27,21 +27,17 @@ with pkgs; dockerTools.buildLayeredImage { set -e mkdir tmp mkdir app && cd app - cp "${./.}"/LICENSE . cp "${./.}"/*.py . cp -r "${./.}"/api . - cp -r "${./.}"/cython_lib . cp -r "${./.}"/middlewares . cp -r "${./.}"/models . cp -r "${./.}"/states . - export PATH="${lib.makeBinPath shell.buildInputs}:$PATH" - ${shell.shellHook} + cp -r "${./.}"/validators . ''; config = { WorkingDir = "/app"; Env = [ - "LD_LIBRARY_PATH=${lib.makeLibraryPath shell.libraries}" "PYTHONPATH=${python-venv}/lib" "PYTHONUNBUFFERED=1" "PYTHONDONTWRITEBYTECODE=1" diff --git a/main.py b/main.py index 5b8d0bc..88f12b5 100644 --- a/main.py +++ b/main.py @@ -1,41 +1,32 @@ from contextlib import asynccontextmanager from datetime import timedelta -import anyio -import sentry_sdk +from anyio import create_task_group from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware -import api.v1.api as api -from config import DEFAULT_CACHE_MAX_AGE, DEFAULT_CACHE_STALE, ENVIRONMENT, VERSION, startup_setup +import api.v1 as v1 +from config import DEFAULT_CACHE_MAX_AGE, DEFAULT_CACHE_STALE, startup_setup from middlewares.cache_middleware import CacheMiddleware from middlewares.profiler_middleware import profiler_middleware from middlewares.version_middleware import version_middleware from orjson_response import CustomORJSONResponse -from states.aed_state import get_aed_state -from states.country_state import get_country_state -from states.worker_state import WorkerStateEnum, get_worker_state - -if ENVIRONMENT: - sentry_sdk.init( - dsn='https://40b1753c3f72721489ca0bca38bb4566@sentry.monicz.dev/3', - release=VERSION, - environment=ENVIRONMENT, - trace_propagation_targets=None, - ) +from states.aed_state import AEDState +from states.country_state import CountryState +from states.worker_state import WorkerState, WorkerStateEnum @asynccontextmanager async def lifespan(_): - worker_state = get_worker_state() + worker_state = WorkerState() await worker_state.ainit() if worker_state.is_primary: await startup_setup() - async with anyio.create_task_group() as tg: - await tg.start(get_country_state().update_db_task) - await tg.start(get_aed_state().update_db_task) + async with create_task_group() as tg: + await tg.start(CountryState.update_db_task) + await tg.start(AEDState.update_db_task) await worker_state.set_state(WorkerStateEnum.RUNNING) yield @@ -48,7 +39,7 @@ async def lifespan(_): app = FastAPI(lifespan=lifespan, default_response_class=CustomORJSONResponse) -app.include_router(api.router, prefix='/api/v1') +app.include_router(v1.router) app.add_middleware(BaseHTTPMiddleware, dispatch=profiler_middleware) app.add_middleware(BaseHTTPMiddleware, dispatch=version_middleware) app.add_middleware( diff --git a/models/aed.py b/models/aed.py index 5fa9a6c..b24b19b 100644 --- a/models/aed.py +++ b/models/aed.py @@ -1,12 +1,16 @@ -from dataclasses import dataclass +from typing import Annotated -from models.lonlat import LonLat +from pydantic import BaseModel, ConfigDict +from shapely import Point +from validators.geometry import GeometrySerializer, GeometryValidator + + +class AED(BaseModel): + model_config = ConfigDict(frozen=True, strict=True) -@dataclass(frozen=True, slots=True) -class AED: id: int - position: LonLat + position: Annotated[Point, GeometryValidator, GeometrySerializer] country_codes: list[str] | None tags: dict[str, str] version: int diff --git a/models/aed_group.py b/models/aed_group.py index 58f75a3..0108c1d 100644 --- a/models/aed_group.py +++ b/models/aed_group.py @@ -1,12 +1,11 @@ from collections.abc import Iterable -from dataclasses import dataclass +from typing import NamedTuple -from models.lonlat import LonLat +from shapely import Point -@dataclass(frozen=True, slots=True) -class AEDGroup: - position: LonLat +class AEDGroup(NamedTuple): + position: Point count: int access: str diff --git a/models/bbox.py b/models/bbox.py index 89d0716..6699274 100644 --- a/models/bbox.py +++ b/models/bbox.py @@ -1,60 +1,63 @@ -from collections.abc import Sequence -from itertools import chain from typing import NamedTuple, Self +import numpy as np +from shapely import Point from shapely.geometry import Polygon -from models.lonlat import LonLat - class BBox(NamedTuple): - p1: LonLat - p2: LonLat + p1: Point + p2: Point def extend(self, percentage: float) -> Self: - lon_span = self.p2.lon - self.p1.lon - lat_span = self.p2.lat - self.p1.lat + lon_span = self.p2.x - self.p1.x + lat_span = self.p2.y - self.p1.y lon_delta = lon_span * percentage lat_delta = lat_span * percentage - new_p1_lon = max(-180, min(180, self.p1.lon - lon_delta)) - new_p1_lat = max(-90, min(90, self.p1.lat - lat_delta)) - new_p2_lon = max(-180, min(180, self.p2.lon + lon_delta)) - new_p2_lat = max(-90, min(90, self.p2.lat + lat_delta)) + new_p1_lon = max(-180, min(180, self.p1.x - lon_delta)) + new_p1_lat = max(-90, min(90, self.p1.y - lat_delta)) + new_p2_lon = max(-180, min(180, self.p2.x + lon_delta)) + new_p2_lat = max(-90, min(90, self.p2.y + lat_delta)) - return BBox(LonLat(new_p1_lon, new_p1_lat), LonLat(new_p2_lon, new_p2_lat)) + return BBox( + p1=Point(new_p1_lon, new_p1_lat), + p2=Point(new_p2_lon, new_p2_lat), + ) @classmethod def from_tuple(cls, bbox: tuple[float, float, float, float]) -> Self: - return cls(LonLat(bbox[0], bbox[1]), LonLat(bbox[2], bbox[3])) + return cls(Point(bbox[0], bbox[1]), Point(bbox[2], bbox[3])) def to_tuple(self) -> tuple[float, float, float, float]: - return (self.p1.lon, self.p1.lat, self.p2.lon, self.p2.lat) + return (self.p1.x, self.p1.y, self.p2.x, self.p2.y) def to_polygon(self, *, nodes_per_edge: int = 2) -> Polygon: if nodes_per_edge <= 2: return Polygon( [ - (self.p1.lon, self.p1.lat), - (self.p2.lon, self.p1.lat), - (self.p2.lon, self.p2.lat), - (self.p1.lon, self.p2.lat), - (self.p1.lon, self.p1.lat), + (self.p1.x, self.p1.y), + (self.p2.x, self.p1.y), + (self.p2.x, self.p2.y), + (self.p1.x, self.p2.y), + (self.p1.x, self.p1.y), ] ) - x_interval = (self.p2.lon - self.p1.lon) / (nodes_per_edge - 1) - y_interval = (self.p2.lat - self.p1.lat) / (nodes_per_edge - 1) + x_vals = np.linspace(self.p1.x, self.p2.x, nodes_per_edge) + y_vals = np.linspace(self.p1.y, self.p2.y, nodes_per_edge) + + bottom_edge = np.column_stack((x_vals, np.full(nodes_per_edge, self.p1.y))) + top_edge = np.column_stack((x_vals, np.full(nodes_per_edge, self.p2.y))) + left_edge = np.column_stack((np.full(nodes_per_edge - 2, self.p1.x), y_vals[1:-1])) + right_edge = np.column_stack((np.full(nodes_per_edge - 2, self.p2.x), y_vals[1:-1])) - bottom_edge = tuple((self.p1.lon + i * x_interval, self.p1.lat) for i in range(nodes_per_edge)) - top_edge = tuple((self.p1.lon + i * x_interval, self.p2.lat) for i in range(nodes_per_edge)) - left_edge = tuple((self.p1.lon, self.p1.lat + i * y_interval) for i in range(1, nodes_per_edge - 1)) - right_edge = tuple((self.p2.lon, self.p1.lat + i * y_interval) for i in range(1, nodes_per_edge - 1)) + all_coords = np.concatenate((bottom_edge, right_edge, top_edge[::-1], left_edge[::-1])) - return Polygon(chain(bottom_edge, right_edge, reversed(top_edge), reversed(left_edge))) + return Polygon(all_coords) - def correct_for_dateline(self) -> Sequence[Self]: - if self.p1.lon > self.p2.lon: - return (BBox(self.p1, LonLat(180, self.p2.lat)), BBox(LonLat(-180, self.p1.lat), self.p2)) + def correct_for_dateline(self) -> tuple[Self, ...]: + if self.p1.x > self.p2.x: + return (BBox(self.p1, Point(180, self.p2.y)), BBox(Point(-180, self.p1.y), self.p2)) else: return (self,) diff --git a/models/country.py b/models/country.py index 9e9b0c1..ccad7a1 100644 --- a/models/country.py +++ b/models/country.py @@ -1,21 +1,19 @@ -from dataclasses import dataclass +from typing import Annotated +from pydantic import BaseModel, ConfigDict +from shapely import Point from shapely.geometry.base import BaseGeometry -from models.lonlat import LonLat +from validators.geometry import GeometrySerializer, GeometryValidator -@dataclass(frozen=True, slots=True) -class CountryLabel: - position: LonLat +class Country(BaseModel): + model_config = ConfigDict(frozen=True, strict=True) - -@dataclass(frozen=True, slots=True) -class Country: names: dict[str, str] code: str - geometry: BaseGeometry - label: CountryLabel + geometry: Annotated[BaseGeometry, GeometryValidator, GeometrySerializer] + label_position: Annotated[Point, GeometryValidator, GeometrySerializer] @property def name(self) -> str: diff --git a/models/lonlat.py b/models/lonlat.py deleted file mode 100644 index 8836254..0000000 --- a/models/lonlat.py +++ /dev/null @@ -1,16 +0,0 @@ -from dataclasses import dataclass - -from shapely import Point - - -@dataclass(frozen=True, slots=True) -class LonLat: - lon: float - lat: float - - def __iter__(self): - return iter((self.lon, self.lat)) - - @property - def shapely(self) -> Point: - return Point(self.lon, self.lat) diff --git a/models/photo_info.py b/models/photo_info.py index a9310f3..6c967cc 100644 --- a/models/photo_info.py +++ b/models/photo_info.py @@ -1,12 +1,12 @@ -from dataclasses import dataclass - from anyio import Path +from pydantic import BaseModel, ConfigDict from config import PHOTOS_DIR -@dataclass(frozen=True, slots=True) -class PhotoInfo: +class PhotoInfo(BaseModel): + model_config = ConfigDict(frozen=True, strict=True) + id: str node_id: str user_id: str diff --git a/models/photo_report.py b/models/photo_report.py index baf82da..cfe05b0 100644 --- a/models/photo_report.py +++ b/models/photo_report.py @@ -1,8 +1,9 @@ -from dataclasses import dataclass +from pydantic import BaseModel, ConfigDict -@dataclass(frozen=True, slots=True) -class PhotoReport: +class PhotoReport(BaseModel): + model_config = ConfigDict(frozen=True, strict=True) + id: str photo_id: str timestamp: float diff --git a/openstreetmap.py b/openstreetmap.py index 4416c9f..6b963db 100644 --- a/openstreetmap.py +++ b/openstreetmap.py @@ -2,6 +2,7 @@ import xmltodict from authlib.integrations.httpx_client import OAuth2Auth +from sentry_sdk import trace from config import CHANGESET_ID_PLACEHOLDER, DEFAULT_CHANGESET_TAGS, OPENSTREETMAP_API_URL from utils import get_http_client, retry_exponential @@ -17,6 +18,7 @@ def __init__(self, oauth2_credentials: dict): self._http = get_http_client(OPENSTREETMAP_API_URL, auth=OAuth2Auth(oauth2_credentials)) @retry_exponential(timedelta(seconds=10)) + @trace async def get_authorized_user(self) -> dict | None: r = await self._http.get('/user/details.json') @@ -28,6 +30,7 @@ async def get_authorized_user(self) -> dict | None: return r.json()['user'] @retry_exponential(timedelta(seconds=10)) + @trace async def get_node_xml(self, node_id: int) -> dict | None: r = await self._http.get(f'/node/{node_id}') @@ -42,6 +45,7 @@ async def get_node_xml(self, node_id: int) -> dict | None: force_list=('tag',), )['osm']['node'] + @trace async def upload_osm_change(self, osm_change: str) -> str: changeset = xmltodict.unparse( { diff --git a/osm_change.py b/osm_change.py index 18b832b..79c8b1e 100644 --- a/osm_change.py +++ b/osm_change.py @@ -3,8 +3,8 @@ from config import CHANGESET_ID_PLACEHOLDER, CREATED_BY -def _initialize_osm_change_structure() -> dict: - return { +def update_node_tags_osm_change(node_xml: dict, update_tags: dict[str, str]) -> str: + result = { 'osmChange': { '@version': 0.6, '@generator': CREATED_BY, @@ -14,10 +14,6 @@ def _initialize_osm_change_structure() -> dict: } } - -def update_node_tags_osm_change(node_xml: dict, update_tags: dict[str, str]) -> str: - result = _initialize_osm_change_structure() - node_xml.pop('@timestamp', None) node_xml.pop('@user', None) node_xml.pop('@uid', None) diff --git a/osm_countries.py b/osm_countries.py index f511e08..54297c8 100644 --- a/osm_countries.py +++ b/osm_countries.py @@ -2,6 +2,7 @@ import brotlicffi as brotli import orjson +from sentry_sdk import trace from shapely.geometry import shape from config import COUNTRIES_GEOJSON_URL @@ -9,6 +10,7 @@ from utils import get_http_client +@trace async def get_osm_countries() -> Sequence[OSMCountry]: async with get_http_client() as http: r = await http.get(COUNTRIES_GEOJSON_URL) diff --git a/overpass.py b/overpass.py index fc82bad..fcee475 100644 --- a/overpass.py +++ b/overpass.py @@ -1,33 +1,14 @@ -from collections.abc import Iterable, Sequence +from collections.abc import Sequence from datetime import UTC, datetime +from sentry_sdk import trace + from config import OVERPASS_API_URL from utils import get_http_client, retry_exponential -def _extract_center(elements: Iterable[dict]) -> None: - for e in elements: - if 'center' in e: - e |= e['center'] - del e['center'] - - -def _split_by_count(elements: Iterable[dict]) -> list[list[dict]]: - result = [] - current_split = [] - - for e in elements: - if e['type'] == 'count': - result.append(current_split) - current_split = [] - else: - current_split.append(e) - - assert not current_split, 'Last element must be count type' - return result - - @retry_exponential(None) +@trace async def query_overpass(query: str, *, timeout: int, must_return: bool = False) -> tuple[Sequence[dict], float]: join = '' if query.startswith('[') else ';' query = f'[out:json][timeout:{timeout}]{join}{query}' @@ -47,6 +28,6 @@ async def query_overpass(query: str, *, timeout: int, must_return: bool = False) ) if must_return and not data['elements']: - raise Exception('No elements returned') + raise ValueError('No elements returned') return data['elements'], data_timestamp diff --git a/planet_diffs.py b/planet_diffs.py index f375b1a..8d7644a 100644 --- a/planet_diffs.py +++ b/planet_diffs.py @@ -3,12 +3,11 @@ from collections.abc import Sequence from datetime import UTC, datetime from itertools import chain -from operator import itemgetter -import anyio import xmltodict -from anyio.streams.memory import MemoryObjectSendStream +from anyio import create_task_group, fail_after from httpx import AsyncClient +from sentry_sdk import start_span, trace from config import AED_REBUILD_THRESHOLD, PLANET_DIFF_TIMEOUT, REPLICATION_URL from utils import get_http_client, retry_exponential @@ -31,6 +30,7 @@ def _format_actions(xml: str) -> str: @retry_exponential(AED_REBUILD_THRESHOLD) +@trace async def _get_state(http: AsyncClient, sequence_number: int | None) -> tuple[int, float]: if sequence_number is None: r = await http.get('state.txt') @@ -50,33 +50,9 @@ async def _get_state(http: AsyncClient, sequence_number: int | None) -> tuple[in return sequence_number, sequence_timestamp -@retry_exponential(AED_REBUILD_THRESHOLD) -async def _get_planet_diff(http: AsyncClient, sequence_number: int, send_stream: MemoryObjectSendStream) -> None: - r = await http.get(f'{_format_sequence_number(sequence_number)}.osc.gz') - r.raise_for_status() - - xml_gz = r.content - xml = gzip.decompress(xml_gz).decode() - xml = _format_actions(xml) - actions: list[dict] = xmltodict.parse( - xml, - postprocessor=xmltodict_postprocessor, - force_list=('action', 'node', 'way', 'relation', 'member', 'tag', 'nd'), - )['osmChange']['action'] - - node_actions = [] - - for action in actions: - if 'node' in action: - action.pop('way', None) - action.pop('relation', None) - node_actions.append(action) - - await send_stream.send((sequence_number, tuple(node_actions))) - - +@trace async def get_planet_diffs(last_update: float) -> tuple[Sequence[dict], float]: - with anyio.fail_after(PLANET_DIFF_TIMEOUT.total_seconds()): + with fail_after(PLANET_DIFF_TIMEOUT.total_seconds()): async with get_http_client(REPLICATION_URL) as http: sequence_numbers = [] sequence_timestamps = [] @@ -94,19 +70,41 @@ async def get_planet_diffs(last_update: float) -> tuple[Sequence[dict], float]: if not sequence_numbers: return (), last_update - send_stream, receive_stream = anyio.create_memory_object_stream() - result: list[tuple[int, dict]] = [] + result: list[tuple[int, list[dict]]] = [] - async with anyio.create_task_group() as tg, send_stream, receive_stream: - for sequence_number in sequence_numbers: - tg.start_soon(_get_planet_diff, http, sequence_number, send_stream) + with start_span(description=f'Processing {len(sequence_numbers)} planet diffs'): - for _ in range(len(sequence_numbers)): - sequence_number, data = await receive_stream.receive() - result.append((sequence_number, data)) + @retry_exponential(AED_REBUILD_THRESHOLD) + async def _get_planet_diff(sequence_number: int) -> None: + r = await http.get(f'{_format_sequence_number(sequence_number)}.osc.gz') + r.raise_for_status() + + xml = gzip.decompress(r.content).decode() + xml = _format_actions(xml) + actions: list[dict] = xmltodict.parse( + xml, + postprocessor=xmltodict_postprocessor, + force_list=('action', 'node', 'way', 'relation', 'member', 'tag', 'nd'), + )['osmChange']['action'] + + node_actions: list[dict] = [] + + for action in actions: + # ignore everything that is not a node + if 'node' in action: + action.pop('way', None) + action.pop('relation', None) + node_actions.append(action) + + result.append((sequence_number, node_actions)) + + async with create_task_group() as tg: + for sequence_number in sequence_numbers: + tg.start_soon(_get_planet_diff, sequence_number) + + # sort by sequence number in ascending order + result.sort(key=lambda x: x[0]) - result.sort(key=itemgetter(0)) data = tuple(chain.from_iterable(data for _, data in result)) data_timestamp = sequence_timestamps[0] - return data, data_timestamp diff --git a/poetry.lock b/poetry.lock index a5ef848..de73000 100644 --- a/poetry.lock +++ b/poetry.lock @@ -13,13 +13,13 @@ files = [ [[package]] name = "anyio" -version = "4.2.0" +version = "4.3.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false python-versions = ">=3.8" files = [ - {file = "anyio-4.2.0-py3-none-any.whl", hash = "sha256:745843b39e829e108e518c489b31dc757de7d2131d53fac32bd8df268227bfee"}, - {file = "anyio-4.2.0.tar.gz", hash = "sha256:e1875bb4b4e2de1669f4bc7869b6d3f54231cdced71605e6e64c9be77e3be50f"}, + {file = "anyio-4.3.0-py3-none-any.whl", hash = "sha256:048e05d0f6caeed70d731f3db756d35dcc1f35747c8c403364a8332c630441b8"}, + {file = "anyio-4.3.0.tar.gz", hash = "sha256:f75253795a87df48568485fd18cdd2a3fa5c4f7c5be8e5e36637733fce06fed6"}, ] [package.dependencies] @@ -376,95 +376,15 @@ ssh = ["bcrypt (>=3.1.5)"] test = ["certifi", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"] test-randomorder = ["pytest-randomly"] -[[package]] -name = "cython" -version = "3.0.8" -description = "The Cython compiler for writing C extensions in the Python language." -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -files = [ - {file = "Cython-3.0.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a846e0a38e2b24e9a5c5dc74b0e54c6e29420d88d1dafabc99e0fc0f3e338636"}, - {file = "Cython-3.0.8-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:45523fdc2b78d79b32834cc1cc12dc2ca8967af87e22a3ee1bff20e77c7f5520"}, - {file = "Cython-3.0.8-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:baa0b7f3f841fe087410cab66778e2d3fb20ae2d2078a2be3dffe66c6574be39"}, - {file = "Cython-3.0.8-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e87294e33e40c289c77a135f491cd721bd089f193f956f7b8ed5aa2d0b8c558f"}, - {file = "Cython-3.0.8-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:a1df7a129344b1215c20096d33c00193437df1a8fcca25b71f17c23b1a44f782"}, - {file = "Cython-3.0.8-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:13c2a5e57a0358da467d97667297bf820b62a1a87ae47c5f87938b9bb593acbd"}, - {file = "Cython-3.0.8-cp310-cp310-win32.whl", hash = "sha256:96b028f044f5880e3cb18ecdcfc6c8d3ce9d0af28418d5ab464509f26d8adf12"}, - {file = "Cython-3.0.8-cp310-cp310-win_amd64.whl", hash = "sha256:8140597a8b5cc4f119a1190f5a2228a84f5ca6d8d9ec386cfce24663f48b2539"}, - {file = "Cython-3.0.8-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aae26f9663e50caf9657148403d9874eea41770ecdd6caf381d177c2b1bb82ba"}, - {file = "Cython-3.0.8-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:547eb3cdb2f8c6f48e6865d5a741d9dd051c25b3ce076fbca571727977b28ac3"}, - {file = "Cython-3.0.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5a567d4b9ba70b26db89d75b243529de9e649a2f56384287533cf91512705bee"}, - {file = "Cython-3.0.8-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:51d1426263b0e82fb22bda8ea60dc77a428581cc19e97741011b938445d383f1"}, - {file = "Cython-3.0.8-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:c26daaeccda072459b48d211415fd1e5507c06bcd976fa0d5b8b9f1063467d7b"}, - {file = "Cython-3.0.8-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:289ce7838208211cd166e975865fd73b0649bf118170b6cebaedfbdaf4a37795"}, - {file = "Cython-3.0.8-cp311-cp311-win32.whl", hash = "sha256:c8aa05f5e17f8042a3be052c24f2edc013fb8af874b0bf76907d16c51b4e7871"}, - {file = "Cython-3.0.8-cp311-cp311-win_amd64.whl", hash = "sha256:000dc9e135d0eec6ecb2b40a5b02d0868a2f8d2e027a41b0fe16a908a9e6de02"}, - {file = "Cython-3.0.8-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:90d3fe31db55685d8cb97d43b0ec39ef614fcf660f83c77ed06aa670cb0e164f"}, - {file = "Cython-3.0.8-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e24791ddae2324e88e3c902a765595c738f19ae34ee66bfb1a6dac54b1833419"}, - {file = "Cython-3.0.8-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2f020fa1c0552052e0660790b8153b79e3fc9a15dbd8f1d0b841fe5d204a6ae6"}, - {file = "Cython-3.0.8-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:18bfa387d7a7f77d7b2526af69a65dbd0b731b8d941aaff5becff8e21f6d7717"}, - {file = "Cython-3.0.8-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:fe81b339cffd87c0069c6049b4d33e28bdd1874625ee515785bf42c9fdff3658"}, - {file = "Cython-3.0.8-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:80fd94c076e1e1b1ee40a309be03080b75f413e8997cddcf401a118879863388"}, - {file = "Cython-3.0.8-cp312-cp312-win32.whl", hash = "sha256:85077915a93e359a9b920280d214dc0cf8a62773e1f3d7d30fab8ea4daed670c"}, - {file = "Cython-3.0.8-cp312-cp312-win_amd64.whl", hash = "sha256:0cb2dcc565c7851f75d496f724a384a790fab12d1b82461b663e66605bec429a"}, - {file = "Cython-3.0.8-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:870d2a0a7e3cbd5efa65aecdb38d715ea337a904ea7bb22324036e78fb7068e7"}, - {file = "Cython-3.0.8-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7e8f2454128974905258d86534f4fd4f91d2f1343605657ecab779d80c9d6d5e"}, - {file = "Cython-3.0.8-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1949d6aa7bc792554bee2b67a9fe41008acbfe22f4f8df7b6ec7b799613a4b3"}, - {file = "Cython-3.0.8-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c9f2c6e1b8f3bcd6cb230bac1843f85114780bb8be8614855b1628b36bb510e0"}, - {file = "Cython-3.0.8-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:05d7eddc668ae7993643f32c7661f25544e791edb745758672ea5b1a82ecffa6"}, - {file = "Cython-3.0.8-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:bfabe115deef4ada5d23c87bddb11289123336dcc14347011832c07db616dd93"}, - {file = "Cython-3.0.8-cp36-cp36m-win32.whl", hash = "sha256:0c38c9f0bcce2df0c3347285863621be904ac6b64c5792d871130569d893efd7"}, - {file = "Cython-3.0.8-cp36-cp36m-win_amd64.whl", hash = "sha256:6c46939c3983217d140999de7c238c3141f56b1ea349e47ca49cae899969aa2c"}, - {file = "Cython-3.0.8-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:115f0a50f752da6c99941b103b5cb090da63eb206abbc7c2ad33856ffc73f064"}, - {file = "Cython-3.0.8-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c9c0f29246734561c90f36e70ed0506b61aa3d044e4cc4cba559065a2a741fae"}, - {file = "Cython-3.0.8-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1ab75242869ff71e5665fe5c96f3378e79e792fa3c11762641b6c5afbbbbe026"}, - {file = "Cython-3.0.8-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6717c06e9cfc6c1df18543cd31a21f5d8e378a40f70c851fa2d34f0597037abc"}, - {file = "Cython-3.0.8-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:9d3f74388db378a3c6fd06e79a809ed98df3f56484d317b81ee762dbf3c263e0"}, - {file = "Cython-3.0.8-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:ae7ac561fd8253a9ae96311e91d12af5f701383564edc11d6338a7b60b285a6f"}, - {file = "Cython-3.0.8-cp37-cp37m-win32.whl", hash = "sha256:97b2a45845b993304f1799664fa88da676ee19442b15fdcaa31f9da7e1acc434"}, - {file = "Cython-3.0.8-cp37-cp37m-win_amd64.whl", hash = "sha256:9e2be2b340fea46fb849d378f9b80d3c08ff2e81e2bfbcdb656e2e3cd8c6b2dc"}, - {file = "Cython-3.0.8-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:2cde23c555470db3f149ede78b518e8274853745289c956a0e06ad8d982e4db9"}, - {file = "Cython-3.0.8-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7990ca127e1f1beedaf8fc8bf66541d066ef4723ad7d8d47a7cbf842e0f47580"}, - {file = "Cython-3.0.8-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b983c8e6803f016146c26854d9150ddad5662960c804ea7f0c752c9266752f0"}, - {file = "Cython-3.0.8-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a973268d7ca1a2bdf78575e459a94a78e1a0a9bb62a7db0c50041949a73b02ff"}, - {file = "Cython-3.0.8-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:61a237bc9dd23c7faef0fcfce88c11c65d0c9bb73c74ccfa408b3a012073c20e"}, - {file = "Cython-3.0.8-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3a3d67f079598af49e90ff9655bf85bd358f093d727eb21ca2708f467c489cae"}, - {file = "Cython-3.0.8-cp38-cp38-win32.whl", hash = "sha256:17a642bb01a693e34c914106566f59844b4461665066613913463a719e0dd15d"}, - {file = "Cython-3.0.8-cp38-cp38-win_amd64.whl", hash = "sha256:2cdfc32252f3b6dc7c94032ab744dcedb45286733443c294d8f909a4854e7f83"}, - {file = "Cython-3.0.8-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fa97893d99385386925d00074654aeae3a98867f298d1e12ceaf38a9054a9bae"}, - {file = "Cython-3.0.8-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f05c0bf9d085c031df8f583f0d506aa3be1692023de18c45d0aaf78685bbb944"}, - {file = "Cython-3.0.8-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:de892422582f5758bd8de187e98ac829330ec1007bc42c661f687792999988a7"}, - {file = "Cython-3.0.8-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:314f2355a1f1d06e3c431eaad4708cf10037b5e91e4b231d89c913989d0bdafd"}, - {file = "Cython-3.0.8-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:78825a3774211e7d5089730f00cdf7f473042acc9ceb8b9eeebe13ed3a5541de"}, - {file = "Cython-3.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:df8093deabc55f37028190cf5e575c26aad23fc673f34b85d5f45076bc37ce39"}, - {file = "Cython-3.0.8-cp39-cp39-win32.whl", hash = "sha256:1aca1b97e0095b3a9a6c33eada3f661a4ed0d499067d121239b193e5ba3bb4f0"}, - {file = "Cython-3.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:16873d78be63bd38ffb759da7ab82814b36f56c769ee02b1d5859560e4c3ac3c"}, - {file = "Cython-3.0.8-py2.py3-none-any.whl", hash = "sha256:171b27051253d3f9108e9759e504ba59ff06e7f7ba944457f94deaf9c21bf0b6"}, - {file = "Cython-3.0.8.tar.gz", hash = "sha256:8333423d8fd5765e7cceea3a9985dd1e0a5dfeb2734629e1a2ed2d6233d39de6"}, -] - -[[package]] -name = "dacite" -version = "1.8.1" -description = "Simple creation of data classes from dictionaries." -optional = false -python-versions = ">=3.6" -files = [ - {file = "dacite-1.8.1-py3-none-any.whl", hash = "sha256:cc31ad6fdea1f49962ea42db9421772afe01ac5442380d9a99fcf3d188c61afe"}, -] - -[package.extras] -dev = ["black", "coveralls", "mypy", "pre-commit", "pylint", "pytest (>=5)", "pytest-benchmark", "pytest-cov"] - [[package]] name = "dnspython" -version = "2.6.0" +version = "2.6.1" description = "DNS toolkit" optional = false python-versions = ">=3.8" files = [ - {file = "dnspython-2.6.0-py3-none-any.whl", hash = "sha256:44c40af3bffed66e3307cea9ab667fd583e138ecc0777b18f262a9dae034e5fa"}, - {file = "dnspython-2.6.0.tar.gz", hash = "sha256:233f871ff384d84c33b2eaf4358ffe7f8927eae3b257ad8467f9bdba7e7ac6bc"}, + {file = "dnspython-2.6.1-py3-none-any.whl", hash = "sha256:5ef3b9680161f6fa89daf8ad451b5f1a33b18ae8a1c6778cdf4b43f08c0a6e50"}, + {file = "dnspython-2.6.1.tar.gz", hash = "sha256:e8f0f9c23a7b7cb99ded64e6c3a6f3e701d78f50c55e002b839dea7225cff7cc"}, ] [package.extras] @@ -1761,6 +1681,8 @@ files = [ [package.dependencies] certifi = "*" fastapi = {version = ">=0.79.0", optional = true, markers = "extra == \"fastapi\""} +httpx = {version = ">=0.16.0", optional = true, markers = "extra == \"httpx\""} +pymongo = {version = ">=3.1", optional = true, markers = "extra == \"pymongo\""} urllib3 = {version = ">=1.26.11", markers = "python_version >= \"3.6\""} [package.extras] @@ -1793,22 +1715,6 @@ starlette = ["starlette (>=0.19.1)"] starlite = ["starlite (>=1.48)"] tornado = ["tornado (>=5)"] -[[package]] -name = "setuptools" -version = "69.1.0" -description = "Easily download, build, install, upgrade, and uninstall Python packages" -optional = false -python-versions = ">=3.8" -files = [ - {file = "setuptools-69.1.0-py3-none-any.whl", hash = "sha256:c054629b81b946d63a9c6e732bc8b2513a7c3ea645f11d0139a2191d735c60c6"}, - {file = "setuptools-69.1.0.tar.gz", hash = "sha256:850894c4195f09c4ed30dba56213bf7c3f21d86ed6bdaafb5df5972593bfc401"}, -] - -[package.extras] -docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] -testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.1)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] - [[package]] name = "shapely" version = "2.0.3" @@ -2024,13 +1930,13 @@ pytz = ["pytz (>=2023.3)"] [[package]] name = "urllib3" -version = "2.2.0" +version = "2.2.1" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.8" files = [ - {file = "urllib3-2.2.0-py3-none-any.whl", hash = "sha256:ce3711610ddce217e6d113a2732fafad960a03fd0318c91faa79481e35c11224"}, - {file = "urllib3-2.2.0.tar.gz", hash = "sha256:051d961ad0c62a94e50ecf1af379c3aba230c66c710493493560c0c223c49f20"}, + {file = "urllib3-2.2.1-py3-none-any.whl", hash = "sha256:450b20ec296a467077128bff42b73080516e71b56ff59a60a02bef2232c4fa9d"}, + {file = "urllib3-2.2.1.tar.gz", hash = "sha256:d0570876c61ab9e520d776c38acbbb5b05a776d3f9ff98a5c8fd5162a444cf19"}, ] [package.extras] @@ -2290,4 +2196,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "995d66baf744d665b6f490742c7766209980f17f648c9113b467a182032e6daa" +content-hash = "6a4ba3d523ffd5dae3d50ad696150a89b4832abc1140b1589add7b022f01652f" diff --git a/pyproject.toml b/pyproject.toml index 9a5eade..ce30b08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,13 +7,11 @@ version = "0.0.0" [tool.poetry.dependencies] anyio = "^4.2.0" -asyncache = "<1" +asyncache = "^0.3.1" authlib = "^1.3.0" beautifulsoup4 = "^4.12.3" brotlicffi = "^1.1.0.0" cachetools = "^5.3.2" -cython = "^3.0.7" -dacite = "^1.8.1" fastapi = "<1" feedgen = "^1.0.0" httpx = {extras = ["brotli", "http2"], version = "<1"} @@ -33,8 +31,7 @@ python-magic = "^0.4.27" python-multipart = "<1" pytz = "*" scikit-learn = "^1.3.2" -sentry-sdk = {extras = ["fastapi"], version = "^1.40.4"} -setuptools = "^69.0.3" +sentry-sdk = {extras = ["fastapi", "httpx", "pymongo"], version = "^1.40.4"} shapely = "^2.0.2" tqdm = "^4.66.1" tzfpy = "*" diff --git a/setup.py b/setup.py deleted file mode 100644 index 0cbec85..0000000 --- a/setup.py +++ /dev/null @@ -1,32 +0,0 @@ -import Cython.Compiler.Options as Options -from Cython.Build import cythonize -from setuptools import Extension, setup - -Options.docstrings = False -Options.annotate = True - -setup( - ext_modules=cythonize( - [ - Extension( - '*', - ['cython_lib/*.py'], - extra_compile_args=[ - '-march=x86-64-v2', - '-mtune=generic', - '-ffast-math', - '-fopenmp', - '-flto=auto', - ], - extra_link_args=[ - '-fopenmp', - '-flto=auto', - ], - ), - ], - compiler_directives={ - # https://cython.readthedocs.io/en/latest/src/userguide/source_files_and_compilation.html#compiler-directives - 'language_level': 3, - }, - ), -) diff --git a/shell.nix b/shell.nix index dab055c..fe02d89 100644 --- a/shell.nix +++ b/shell.nix @@ -3,7 +3,7 @@ let # Currently using nixpkgs-23.11-darwin # Update with `nixpkgs-update` command - pkgs = import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/6716c0e608eed726549fd92704b9a7a7077bdf00.tar.gz") { }; + pkgs = import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/e0da498ad77ac8909a980f07eff060862417ccf7.tar.gz") { }; libraries' = with pkgs; [ # Base libraries @@ -13,9 +13,22 @@ let zlib.out ]; + # Wrap Python to override LD_LIBRARY_PATH + wrappedPython = with pkgs; (symlinkJoin { + name = "python"; + paths = [ + # Enable Python optimizations when in production + (if isDevelopment then python312 else python312.override { enableOptimizations = true; }) + ]; + buildInputs = [ makeWrapper ]; + postBuild = '' + wrapProgram "$out/bin/python3.12" --prefix LD_LIBRARY_PATH : "${lib.makeLibraryPath libraries'}" + ''; + }); + packages' = with pkgs; [ # Base packages - python312 + wrappedPython # Scripts # -- Misc @@ -26,17 +39,8 @@ let # Development packages poetry ruff - gcc # Scripts - # -- Cython - (writeShellScriptBin "cython-build" '' - python setup.py build_ext --build-lib cython_lib - '') - (writeShellScriptBin "cython-clean" '' - rm -rf build "cython_lib/"*{.c,.html,.so} - '') - # -- Docker (dev) (writeShellScriptBin "dev-start" '' if command -v podman &> /dev/null; then docker() { podman "$@"; } fi @@ -64,7 +68,6 @@ let '') (writeShellScriptBin "docker-build" '' set -e - cython-clean && cython-build if command -v podman &> /dev/null; then docker() { podman "$@"; } fi docker load < "$(sudo nix-build --no-out-link)" '') @@ -75,14 +78,16 @@ let echo "Installing Python dependencies" export POETRY_VIRTUALENVS_IN_PROJECT=1 + poetry env use "${wrappedPython}/bin/python" poetry install --no-root --compile echo "Activating Python virtual environment" source .venv/bin/activate - export LD_LIBRARY_PATH="${lib.makeLibraryPath libraries'}" - # Development environment variables + export PYTHONNOUSERSITE=1 + export TZ="UTC" + if [ -f .env ]; then echo "Loading .env file" set -o allexport @@ -94,7 +99,6 @@ let ''; in pkgs.mkShell { - libraries = libraries'; buildInputs = libraries' ++ packages'; shellHook = shell'; } diff --git a/state_utils.py b/state_utils.py index 9af4dde..76b6d7f 100644 --- a/state_utils.py +++ b/state_utils.py @@ -1,9 +1,13 @@ +from sentry_sdk import trace + from config import STATE_COLLECTION +@trace async def get_state_doc(name: str, **kwargs) -> dict | None: return await STATE_COLLECTION.find_one({'_name': name}, **kwargs) +@trace async def set_state_doc(name: str, doc: dict, **kwargs) -> None: await STATE_COLLECTION.replace_one({'_name': name}, doc | {'_name': name}, upsert=True, **kwargs) diff --git a/states/aed_state.py b/states/aed_state.py index d44ae64..f05729c 100644 --- a/states/aed_state.py +++ b/states/aed_state.py @@ -1,14 +1,16 @@ from collections.abc import Iterable, Sequence from time import time -from typing import Annotated, NoReturn +from typing import NoReturn import anyio +import numpy as np from asyncache import cached from cachetools import TTLCache -from dacite import from_dict -from fastapi import Depends from pymongo import DeleteOne, ReplaceOne, UpdateOne +from sentry_sdk import start_span, start_transaction, trace +from shapely import Point from shapely.geometry import mapping +from shapely.geometry.base import BaseGeometry from sklearn.cluster import Birch from tqdm import tqdm @@ -16,19 +18,20 @@ from models.aed import AED from models.aed_group import AEDGroup from models.bbox import BBox -from models.lonlat import LonLat from overpass import query_overpass from planet_diffs import get_planet_diffs from state_utils import get_state_doc, set_state_doc from transaction import Transaction -from utils import as_dict, print_run_time, retry_exponential +from utils import retry_exponential +from validators.geometry import geometry_validator _AED_QUERY = 'node[emergency=defibrillator];out meta qt;' +@trace async def _should_update_db() -> tuple[bool, float]: doc = await get_state_doc('aed') - if doc is None or doc.get('version', 1) < 2: + if doc is None or doc.get('version', 1) < 3: return True, 0 update_timestamp: float = doc['update_timestamp'] @@ -39,27 +42,22 @@ async def _should_update_db() -> tuple[bool, float]: return False, update_timestamp -def _is_defibrillator(tags: dict[str, str]) -> bool: - return tags.get('emergency') == 'defibrillator' - - +@trace async def _assign_country_codes(aeds: Sequence[AED]) -> None: - from states.country_state import get_country_state - - country_state = get_country_state() + from states.country_state import CountryState if len(aeds) < 100: bulk_write_args = [] for aed in aeds: - countries = await country_state.get_countries_within(aed.position) + countries = await CountryState.get_countries_within(aed.position) country_codes = tuple({c.code for c in countries}) bulk_write_args.append(UpdateOne({'id': aed.id}, {'$set': {'country_codes': country_codes}})) else: - countries = await country_state.get_all_countries() + countries = await CountryState.get_all_countries() id_codes_map = {aed.id: set() for aed in aeds} for country in tqdm(countries, desc='📫 Iterating over countries'): - async for c in AED_COLLECTION.find( + async for doc in AED_COLLECTION.find( { '$and': [ {'id': {'$in': tuple(id_codes_map)}}, @@ -67,16 +65,12 @@ async def _assign_country_codes(aeds: Sequence[AED]) -> None: ] } ): - id_codes_map[c['id']].add(country.code) + id_codes_map[doc['id']].add(country.code) bulk_write_args = [ UpdateOne( {'id': aed.id}, - { - '$set': { - 'country_codes': tuple(id_codes_map[aed.id]), - } - }, + {'$set': {'country_codes': tuple(id_codes_map[aed.id])}}, ) for aed in aeds ] @@ -85,29 +79,34 @@ async def _assign_country_codes(aeds: Sequence[AED]) -> None: await AED_COLLECTION.bulk_write(bulk_write_args, ordered=False) +def _is_defibrillator(tags: dict[str, str]) -> bool: + return tags.get('emergency') == 'defibrillator' + + def _process_overpass_node(node: dict) -> AED: tags = node.get('tags', {}) is_valid = _is_defibrillator(tags) assert is_valid, 'Unexpected non-defibrillator node' return AED( id=node['id'], - position=LonLat(node['lon'], node['lat']), + position=Point(node['lon'], node['lat']), country_codes=None, tags=tags, version=node['version'], ) +@trace async def _update_db_snapshot() -> None: print('🩺 Updating aed database (overpass)...') elements, data_timestamp = await query_overpass(_AED_QUERY, timeout=3600, must_return=True) aeds = tuple(_process_overpass_node(e) for e in elements) - insert_many_arg = tuple(as_dict(aed) for aed in aeds) + insert_many_arg = tuple(aed.model_dump() for aed in aeds) async with Transaction() as s: await AED_COLLECTION.delete_many({}, session=s) await AED_COLLECTION.insert_many(insert_many_arg, session=s) - await set_state_doc('aed', {'update_timestamp': data_timestamp, 'version': 2}, session=s) + await set_state_doc('aed', {'update_timestamp': data_timestamp, 'version': 3}, session=s) if aeds: print('🩺 Updating country codes') @@ -121,25 +120,23 @@ def _parse_xml_tags(data: dict) -> dict[str, str]: return {tag['@k']: tag['@v'] for tag in tags} -def _process_action(action: dict) -> Iterable[AED | str]: - def _process_create_or_modify(node: dict) -> AED | str: +def _process_action(action: dict) -> Iterable[AED | int]: + def _process_create_or_modify(node: dict) -> AED | int: node_tags = _parse_xml_tags(node) node_valid = _is_defibrillator(node_tags) - node_id = int(node['@id']) if node_valid: return AED( - id=node_id, - position=LonLat(float(node['@lon']), float(node['@lat'])), + id=node['@id'], + position=Point(node['@lon'], node['@lat']), country_codes=None, tags=node_tags, - version=int(node['@version']), + version=node['@version'], ) else: - return node_id + return node['@id'] def _process_delete(node: dict) -> str: - node_id = int(node['@id']) - return node_id + return node['@id'] if action['@type'] in ('create', 'modify'): return (_process_create_or_modify(node) for node in action['node']) @@ -149,6 +146,7 @@ def _process_delete(node: dict) -> str: raise NotImplementedError(f'Unknown action type: {action["@type"]}') +@trace async def _update_db_diffs(last_update: float) -> None: print('🩺 Updating aed database (diff)...') actions, data_timestamp = await get_planet_diffs(last_update) @@ -157,8 +155,8 @@ async def _update_db_diffs(last_update: float) -> None: print('🩺 Nothing to update') return - aeds = [] - remove_ids = set() + aeds: list[AED] = [] + remove_ids: set[int] = set() for action in actions: for result in _process_action(action): @@ -167,13 +165,13 @@ async def _update_db_diffs(last_update: float) -> None: else: remove_ids.add(result) - bulk_write_arg = [ReplaceOne({'id': aed.id}, as_dict(aed), upsert=True) for aed in aeds] - bulk_write_arg += [DeleteOne({'id': remove_id}) for remove_id in remove_ids] + bulk_write_arg = [ReplaceOne({'id': aed.id}, aed.model_dump(), upsert=True) for aed in aeds] + bulk_write_arg.extend(DeleteOne({'id': remove_id}) for remove_id in remove_ids) # keep transaction as short as possible: avoid doing any computation inside async with Transaction() as s: await AED_COLLECTION.bulk_write(bulk_write_arg, ordered=True, session=s) - await set_state_doc('aed', {'update_timestamp': data_timestamp, 'version': 2}, session=s) + await set_state_doc('aed', {'update_timestamp': data_timestamp, 'version': 3}, session=s) if aeds: print('🩺 Updating country codes') @@ -183,6 +181,7 @@ async def _update_db_diffs(last_update: float) -> None: @retry_exponential(None, start=4) +@trace async def _update_db() -> None: update_required, update_timestamp = await _should_update_db() if not update_required: @@ -197,7 +196,8 @@ async def _update_db() -> None: class AEDState: - async def update_db_task(self, *, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn: + @staticmethod + async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn: if (await _should_update_db())[1] > 0: task_status.started() started = True @@ -205,49 +205,79 @@ async def update_db_task(self, *, task_status=anyio.TASK_STATUS_IGNORED) -> NoRe started = False while True: - await _update_db() + with start_transaction(op='update_db', name=AEDState.update_db_task.__qualname__): + await _update_db() if not started: task_status.started() started = True await anyio.sleep(AED_UPDATE_DELAY.total_seconds()) - async def update_country_codes(self) -> None: - await _assign_country_codes(await self.get_all_aeds()) + @classmethod + @trace + async def update_country_codes(cls) -> None: + await _assign_country_codes(await cls.get_all_aeds()) - @cached(TTLCache(maxsize=1024, ttl=300)) - async def count_aeds_by_country_code(self, country_code: str) -> int: + @staticmethod + @cached(TTLCache(maxsize=1024, ttl=3600)) + @trace + async def count_aeds_by_country_code(country_code: str) -> int: return await AED_COLLECTION.count_documents({'country_codes': country_code}) - async def get_all_aeds(self, filter: dict | None = None) -> Sequence[AED]: + @staticmethod + @trace + async def get_aed_by_id(aed_id: int) -> AED | None: + doc = await AED_COLLECTION.find_one({'id': aed_id}, projection={'_id': False}) + if doc is None: + return None + + doc['position'] = geometry_validator(doc['position']) + aed = AED.model_construct(**doc) + return aed + + @staticmethod + @trace + async def get_all_aeds(filter: dict | None = None) -> Sequence[AED]: cursor = AED_COLLECTION.find(filter, projection={'_id': False}) result = [] - async for c in cursor: - result.append(from_dict(AED, c)) # noqa: PERF401 - - return tuple(result) + async for doc in cursor: + doc['position'] = geometry_validator(doc['position']) + aed = AED.model_construct(**doc) + result.append(aed) - async def get_aeds_by_country_code(self, country_code: str) -> Sequence[AED]: - return await self.get_all_aeds({'country_codes': country_code}) + return result - async def get_aeds_within(self, bbox: BBox, group_eps: float | None) -> Sequence[AED | AEDGroup]: - return await self.get_aeds_within_geom(bbox.to_polygon(), group_eps) + @classmethod + async def get_aeds_by_country_code(cls, country_code: str) -> Sequence[AED]: + return await cls.get_all_aeds({'country_codes': country_code}) - async def get_aeds_within_geom(self, geometry, group_eps: float | None) -> Sequence[AED | AEDGroup]: - aeds = await self.get_all_aeds({'position': {'$geoIntersects': {'$geometry': mapping(geometry)}}}) + @classmethod + async def get_aeds_within_geom(cls, geometry: BaseGeometry, group_eps: float | None) -> Sequence[AED | AEDGroup]: + aeds = await cls.get_all_aeds({'position': {'$geoIntersects': {'$geometry': mapping(geometry)}}}) if len(aeds) <= 1 or group_eps is None: return aeds - result_positions = tuple(tuple(iter(aed.position)) for aed in aeds) - model = Birch(threshold=group_eps, n_clusters=None, copy=False) + positions = tuple((aed.position.x, aed.position.y) for aed in aeds) - with print_run_time(f'Clustering {len(aeds)} samples'): - clusters = model.fit_predict(result_positions) + # deterministic sampling + max_fit_samples = 7000 + if len(positions) > max_fit_samples: + indices = np.linspace(0, len(positions), max_fit_samples, endpoint=False, dtype=int) + fit_positions = np.array(positions)[indices] + else: + fit_positions = positions - with print_run_time(f'Processing {len(aeds)} samples'): - result: list[AED | AEDGroup] = [] + with start_span(description=f'Fitting model with {len(fit_positions)} samples'): + model = Birch(threshold=group_eps, n_clusters=None, copy=False) + model.fit(fit_positions) + + with start_span(description=f'Processing {len(aeds)} samples'): cluster_groups: tuple[list[AED]] = tuple([] for _ in range(len(model.subcluster_centers_))) + result: list[AED | AEDGroup] = [] + + with start_span(description='Clustering'): + clusters = model.predict(positions) for aed, cluster in zip(aeds, clusters, strict=True): cluster_groups[cluster].append(aed) @@ -255,31 +285,20 @@ async def get_aeds_within_geom(self, geometry, group_eps: float | None) -> Seque for group, center in zip(cluster_groups, model.subcluster_centers_, strict=True): if len(group) == 0: continue - if len(group) == 1: result.append(group[0]) continue result.append( AEDGroup( - position=LonLat(center[0], center[1]), + position=Point(center[0], center[1]), count=len(group), access=AEDGroup.decide_access(aed.access for aed in group), ) ) - return tuple(result) - - async def get_aed_by_id(self, aed_id: int) -> AED | None: - doc = await AED_COLLECTION.find_one({'id': aed_id}, projection={'_id': False}) - return from_dict(AED, doc) if doc else None - - -_instance = AEDState() - - -def get_aed_state() -> AEDState: - return _instance - + return result -AEDStateDep = Annotated[AEDState, Depends(get_aed_state)] + @classmethod + async def get_aeds_within_bbox(cls, bbox: BBox, group_eps: float | None) -> Sequence[AED | AEDGroup]: + return await cls.get_aeds_within_geom(bbox.to_polygon(), group_eps) diff --git a/states/country_state.py b/states/country_state.py index 10a9111..d455d20 100644 --- a/states/country_state.py +++ b/states/country_state.py @@ -1,23 +1,23 @@ from collections.abc import Sequence from time import time -from typing import Annotated, NoReturn +from typing import NoReturn import anyio -from dacite import from_dict -from fastapi import Depends -from shapely.geometry import Point, mapping, shape +from sentry_sdk import start_transaction, trace +from shapely.geometry import Point, mapping +from shapely.geometry.base import BaseGeometry from config import COUNTRY_COLLECTION, COUNTRY_UPDATE_DELAY from models.bbox import BBox -from models.country import Country, CountryLabel -from models.lonlat import LonLat +from models.country import Country from osm_countries import get_osm_countries from state_utils import get_state_doc, set_state_doc from transaction import Transaction -from utils import as_dict, retry_exponential +from utils import retry_exponential +from validators.geometry import geometry_validator -class CountryCode: +class CountryCodeAssigner: def __init__(self): self.used = set() @@ -36,6 +36,9 @@ def get_unique(self, tags: dict[str, str]) -> str: return 'XX' +shape_cache: dict[str, tuple[BaseGeometry, Point]] = {} + + def _get_names(tags: dict[str, str]) -> dict[str, str]: names = {} @@ -55,9 +58,10 @@ def _get_names(tags: dict[str, str]) -> dict[str, str]: return names +@trace async def _should_update_db() -> tuple[bool, float]: doc = await get_state_doc('country') - if doc is None: + if doc is None or doc.get('version', 1) < 2: return True, 0 update_timestamp: float = doc['update_timestamp'] @@ -69,6 +73,7 @@ async def _should_update_db() -> tuple[bool, float]: @retry_exponential(None, start=4) +@trace async def _update_db() -> None: update_required, update_timestamp = await _should_update_db() if not update_required: @@ -87,35 +92,37 @@ async def _update_db() -> None: print(f'🗺️ Not enough countries found: {len(osm_countries)})') return - country_code = CountryCode() - countries: list[Country] = [] + country_code_assigner = CountryCodeAssigner() + insert_many_arg = [] for c in osm_countries: - names = _get_names(c.tags) - code = country_code.get_unique(c.tags) - label_position = LonLat(c.representative_point.x, c.representative_point.y) - label = CountryLabel(label_position) - countries.append(Country(names, code, c.geometry, label)) - - insert_many_arg = tuple(as_dict(c) for c in countries) + country = Country( + names=_get_names(c.tags), + code=country_code_assigner.get_unique(c.tags), + geometry=c.geometry, + label_position=c.representative_point, + ) + insert_many_arg.append(country.model_dump()) # keep transaction as short as possible: avoid doing any computation inside async with Transaction() as s: await COUNTRY_COLLECTION.delete_many({}, session=s) await COUNTRY_COLLECTION.insert_many(insert_many_arg, session=s) - await set_state_doc('country', {'update_timestamp': data_timestamp}, session=s) + await set_state_doc('country', {'update_timestamp': data_timestamp, 'version': 2}, session=s) + + shape_cache.clear() print('🗺️ Updating country codes') - from states.aed_state import get_aed_state + from states.aed_state import AEDState - aed_state = get_aed_state() - await aed_state.update_country_codes() + await AEDState.update_country_codes() print('🗺️ Update complete') class CountryState: - async def update_db_task(self, *, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn: + @staticmethod + async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn: if (await _should_update_db())[1] > 0: task_status.started() started = True @@ -123,42 +130,46 @@ async def update_db_task(self, *, task_status=anyio.TASK_STATUS_IGNORED) -> NoRe started = False while True: - await _update_db() + with start_transaction(op='update_db', name=CountryState.update_db_task.__qualname__, sampled=True): + await _update_db() if not started: task_status.started() started = True await anyio.sleep(COUNTRY_UPDATE_DELAY.total_seconds()) - async def get_all_countries(self, filter: dict | None = None) -> Sequence[Country]: + @staticmethod + @trace + async def get_all_countries(filter: dict | None = None) -> Sequence[Country]: cursor = COUNTRY_COLLECTION.find(filter, projection={'_id': False}) result = [] - async for c in cursor: - result.append(from_dict(Country, {**c, 'geometry': shape(c['geometry'])})) # noqa: PERF401 + async for doc in cursor: + cached = shape_cache.get(doc['code']) + if cached is None: + geometry = geometry_validator(doc['geometry']) + label_position = geometry_validator(doc['label_position']) + shape_cache[doc['code']] = (geometry, label_position) + else: + geometry, label_position = cached + + doc['geometry'] = geometry + doc['label_position'] = label_position + result.append(Country.model_construct(**doc)) - return tuple(result) + return result - async def get_countries_within(self, bbox_or_pos: BBox | LonLat) -> Sequence[Country]: - return await self.get_all_countries( + @classmethod + async def get_countries_within(cls, bbox_or_pos: BBox | Point) -> Sequence[Country]: + return await cls.get_all_countries( { 'geometry': { '$geoIntersects': { '$geometry': mapping( bbox_or_pos.to_polygon(nodes_per_edge=8) - if isinstance(bbox_or_pos, BBox) - else Point(bbox_or_pos.lon, bbox_or_pos.lat) + if isinstance(bbox_or_pos, BBox) # + else bbox_or_pos ) } } } ) - - -_instance = CountryState() - - -def get_country_state() -> CountryState: - return _instance - - -CountryStateDep = Annotated[CountryState, Depends(get_country_state)] diff --git a/states/photo_report_state.py b/states/photo_report_state.py index 86d36a5..c59e8b2 100644 --- a/states/photo_report_state.py +++ b/states/photo_report_state.py @@ -1,22 +1,20 @@ import secrets from collections.abc import Sequence from time import time -from typing import Annotated import pymongo -from dacite import from_dict -from fastapi import Depends +from sentry_sdk import trace from config import PHOTO_REPORT_COLLECTION from models.photo_report import PhotoReport -from states.photo_state import get_photo_state -from utils import as_dict +from states.photo_state import PhotoState class PhotoReportState: - async def report_by_photo_id(self, photo_id: str) -> bool: - photo_state = get_photo_state() - photo_info = await photo_state.get_photo_by_id(photo_id) + @staticmethod + @trace + async def report_by_photo_id(photo_id: str) -> bool: + photo_info = await PhotoState.get_photo_by_id(photo_id) if photo_info is None: return False # photo not found @@ -25,35 +23,25 @@ async def report_by_photo_id(self, photo_id: str) -> bool: return False # already reported await PHOTO_REPORT_COLLECTION.insert_one( - as_dict( - PhotoReport( - id=secrets.token_urlsafe(16), - photo_id=photo_id, - timestamp=time(), - ) - ) + PhotoReport( + id=secrets.token_urlsafe(16), + photo_id=photo_id, + timestamp=time(), + ).model_dump() ) return True - async def get_recent_reports(self, count: int = 10) -> Sequence[PhotoReport]: + @staticmethod + @trace + async def get_recent_reports(count: int = 10) -> Sequence[PhotoReport]: cursor = ( PHOTO_REPORT_COLLECTION.find(projection={'_id': False}).sort('timestamp', pymongo.DESCENDING).limit(count) ) result = [] - async for c in cursor: - result.append(from_dict(PhotoReport, c)) # noqa: PERF401 + async for doc in cursor: + result.append(PhotoReport.model_construct(**doc)) # noqa: PERF401 return tuple(result) - - -_instance = PhotoReportState() - - -def get_photo_report_state() -> PhotoReportState: - return _instance - - -PhotoReportStateDep = Annotated[PhotoReportState, Depends(get_photo_report_state)] diff --git a/states/photo_state.py b/states/photo_state.py index 5f45c1b..514b1dc 100644 --- a/states/photo_state.py +++ b/states/photo_state.py @@ -1,18 +1,16 @@ import secrets from io import BytesIO from time import time -from typing import Annotated -import anyio -from dacite import from_dict -from fastapi import Depends, UploadFile +from fastapi import UploadFile from PIL import Image, ImageOps +from sentry_sdk import trace from config import IMAGE_LIMIT_PIXELS, IMAGE_MAX_FILE_SIZE, PHOTO_COLLECTION from models.photo_info import PhotoInfo -from utils import as_dict +@trace def _resize_image(img: Image.Image) -> Image.Image: width, height = img.size @@ -25,6 +23,7 @@ def _resize_image(img: Image.Image) -> Image.Image: return img.resize((new_width, new_height), Image.LANCZOS) +@trace def _optimize_image(img: Image.Image, format: str = 'WEBP') -> bytes: high, low = 95, 20 bs_step = 5 @@ -76,20 +75,24 @@ def _optimize_image(img: Image.Image, format: str = 'WEBP') -> bytes: class PhotoState: - async def get_photo_by_id(self, id: str, *, check_file: bool = True) -> PhotoInfo | None: + @staticmethod + @trace + async def get_photo_by_id(id: str, *, check_file: bool = True) -> PhotoInfo | None: doc = await PHOTO_COLLECTION.find_one({'id': id}, projection={'_id': False}) if doc is None: return None - info = from_dict(PhotoInfo, doc) + info = PhotoInfo.model_construct(**doc) if check_file and not await info.path.is_file(): return None return info - async def set_photo(self, node_id: int, user_id: int, file: UploadFile) -> PhotoInfo: + @staticmethod + @trace + async def set_photo(node_id: int, user_id: int, file: UploadFile) -> PhotoInfo: info = PhotoInfo( id=secrets.token_urlsafe(16), node_id=str(node_id), @@ -100,18 +103,8 @@ async def set_photo(self, node_id: int, user_id: int, file: UploadFile) -> Photo img = Image.open(file.file) img = ImageOps.exif_transpose(img) img = _resize_image(img) - img_bytes = await anyio.to_thread.run_sync(_optimize_image, img) + img_bytes = _optimize_image(img) await info.path.write_bytes(img_bytes) - await PHOTO_COLLECTION.insert_one(as_dict(info)) + await PHOTO_COLLECTION.insert_one(info.model_dump()) return info - - -_instance = PhotoState() - - -def get_photo_state() -> PhotoState: - return _instance - - -PhotoStateDep = Annotated[PhotoState, Depends(get_photo_state)] diff --git a/states/worker_state.py b/states/worker_state.py index 223d9e7..204bb2d 100644 --- a/states/worker_state.py +++ b/states/worker_state.py @@ -2,11 +2,9 @@ import os from datetime import timedelta from enum import Enum -from typing import Annotated import anyio import psutil -from fastapi import Depends from config import NAME from utils import retry_exponential @@ -16,7 +14,7 @@ _LOCK_PATH = anyio.Path(f'/tmp/{NAME}-worker.lock') -class WorkerStateEnum(Enum): +class WorkerStateEnum(str, Enum): STARTUP = 'startup' RUNNING = 'running' @@ -60,13 +58,3 @@ async def get_state(self) -> WorkerStateEnum: async def wait_for_state(self, state: WorkerStateEnum) -> None: while await self.get_state() != state: await anyio.sleep(0.1) - - -_instance = WorkerState() - - -def get_worker_state() -> WorkerState: - return _instance - - -WorkerStateDep = Annotated[WorkerState, Depends(get_worker_state)] diff --git a/utils.py b/utils.py index e4f1fd9..f1a3aa6 100644 --- a/utils.py +++ b/utils.py @@ -1,35 +1,14 @@ import functools import time import traceback -from collections.abc import Generator -from contextlib import contextmanager -from dataclasses import asdict from datetime import timedelta -from typing import Any import anyio import httpx -from shapely.geometry import mapping from config import USER_AGENT -@contextmanager -def print_run_time(message: str | list) -> Generator[None, None, None]: - start_time = time.perf_counter() - try: - yield - finally: - end_time = time.perf_counter() - elapsed_time = end_time - start_time - - # support message by reference - if isinstance(message, list): - message = message[0] - - print(f'[⏱️] {message} took {elapsed_time:.3f}s') - - def retry_exponential(timeout: timedelta | None, *, start: float = 1): timeout_seconds = float('inf') if timeout is None else timeout.total_seconds() @@ -55,7 +34,7 @@ async def wrapper(*args, **kwargs): return decorator -def get_http_client(base_url: str = '', *, auth: Any = None) -> httpx.AsyncClient: +def get_http_client(base_url: str = '', *, auth=None) -> httpx.AsyncClient: return httpx.AsyncClient( auth=auth, base_url=base_url, @@ -75,15 +54,5 @@ def abbreviate(num: int) -> str: return str(num) -def as_dict(data) -> dict: - d = asdict(data) - - for k, v in d.items(): - if hasattr(v, '__geo_interface__'): - d[k] = mapping(v) - - return d - - def get_wikimedia_commons_url(path: str) -> str: return f'https://commons.wikimedia.org/wiki/{path}' diff --git a/validators/geometry.py b/validators/geometry.py new file mode 100644 index 0000000..ff5f2e9 --- /dev/null +++ b/validators/geometry.py @@ -0,0 +1,23 @@ +from pydantic import PlainSerializer, PlainValidator +from shapely.geometry import mapping, shape +from shapely.ops import BaseGeometry + + +def geometry_validator(value: dict | BaseGeometry) -> BaseGeometry: + """ + Validate a geometry. + """ + + return shape(value) if isinstance(value, dict) else value + + +def geometry_serializer(value: BaseGeometry) -> dict: + """ + Serialize a geometry. + """ + + return mapping(value) + + +GeometryValidator = PlainValidator(geometry_validator) +GeometrySerializer = PlainSerializer(geometry_serializer, return_type=dict) diff --git a/xmltodict_postprocessor.py b/xmltodict_postprocessor.py index bdf27b0..1f121dd 100644 --- a/xmltodict_postprocessor.py +++ b/xmltodict_postprocessor.py @@ -1,8 +1,11 @@ -def xmltodict_postprocessor(path, key, value): - if key in {'@id', '@ref', '@changeset', '@uid'}: +def xmltodict_postprocessor(_, key, value): + if key in ('@id', '@ref', '@changeset', '@uid'): return key, int(value) - if key in {'@version'}: + if key in ('@lon, @lat'): + return key, float(value) + + if key == '@version': try: return key, int(value) except ValueError: