Skip to content

Commit

Permalink
More traces, Fix aed count cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Zaczero committed Feb 19, 2024
1 parent 1460fd5 commit 4267dfd
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 15 deletions.
4 changes: 3 additions & 1 deletion api/v1/tile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
from anyio import create_task_group
from fastapi import APIRouter, Path, Response
from sentry_sdk import start_span
from sentry_sdk import start_span, trace
from sentry_sdk.tracing import Span
from shapely.ops import transform

Expand Down Expand Up @@ -95,6 +95,7 @@ def _mvt_encode(bbox: BBox, data: Sequence[dict]) -> bytes:
return mvt.encode(data, default_options={'extents': MVT_EXTENT})


@trace
async def _get_tile_country(z: int, bbox: BBox) -> bytes:
countries = await CountryState.get_countries_within(bbox)
country_count_map: dict[str, str] = {}
Expand Down Expand Up @@ -144,6 +145,7 @@ async def count_task(country: Country) -> None:
)


@trace
async def _get_tile_aed(z: int, bbox: BBox) -> bytes:
group_eps = 9.8 / 2**z if z < TILE_MAX_Z else None
aeds = await AEDState.get_aeds_within_bbox(bbox.extend(0.5), group_eps)
Expand Down
2 changes: 1 addition & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pyproj import Transformer

NAME = 'openaedmap-backend'
VERSION = '2.7.0'
VERSION = '2.7.0.240219'
CREATED_BY = f'{NAME} {VERSION}'
WEBSITE = 'https://openaedmap.org'

Expand Down
1 change: 1 addition & 0 deletions planet_diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def _format_actions(xml: str) -> str:


@retry_exponential(AED_REBUILD_THRESHOLD)
@trace
async def _get_state(http: AsyncClient, sequence_number: int | None) -> tuple[int, float]:
if sequence_number is None:
r = await http.get('state.txt')
Expand Down
27 changes: 26 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ version = "0.0.0"

[tool.poetry.dependencies]
anyio = "^4.2.0"
asyncache = "^0.3.1"
authlib = "^1.3.0"
beautifulsoup4 = "^4.12.3"
brotlicffi = "^1.1.0.0"
cachetools = "^5.3.2"
dacite = "^1.8.1"
fastapi = "<1"
feedgen = "^1.0.0"
Expand Down
4 changes: 4 additions & 0 deletions state_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from sentry_sdk import trace

from config import STATE_COLLECTION


@trace
async def get_state_doc(name: str, **kwargs) -> dict | None:
return await STATE_COLLECTION.find_one({'_name': name}, **kwargs)


@trace
async def set_state_doc(name: str, doc: dict, **kwargs) -> None:
await STATE_COLLECTION.replace_one({'_name': name}, doc | {'_name': name}, upsert=True, **kwargs)
24 changes: 15 additions & 9 deletions states/aed_state.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from collections.abc import Iterable, Sequence
from functools import lru_cache
from time import time
from typing import NoReturn

import anyio
from asyncache import cached
from cachetools import TTLCache
from dacite import from_dict
from pymongo import DeleteOne, ReplaceOne, UpdateOne
from sentry_sdk import start_span, trace
from sentry_sdk import start_span, start_transaction, trace
from sentry_sdk.tracing import Span
from shapely.geometry import mapping
from sklearn.cluster import Birch
Expand All @@ -26,6 +27,7 @@
_AED_QUERY = 'node[emergency=defibrillator];out meta qt;'


@trace
async def _should_update_db() -> tuple[bool, float]:
doc = await get_state_doc('aed')
if doc is None or doc.get('version', 1) < 2:
Expand All @@ -39,10 +41,7 @@ async def _should_update_db() -> tuple[bool, float]:
return False, update_timestamp


def _is_defibrillator(tags: dict[str, str]) -> bool:
return tags.get('emergency') == 'defibrillator'


@trace
async def _assign_country_codes(aeds: Sequence[AED]) -> None:
from states.country_state import CountryState

Expand Down Expand Up @@ -83,6 +82,10 @@ async def _assign_country_codes(aeds: Sequence[AED]) -> None:
await AED_COLLECTION.bulk_write(bulk_write_args, ordered=False)


def _is_defibrillator(tags: dict[str, str]) -> bool:
return tags.get('emergency') == 'defibrillator'


def _process_overpass_node(node: dict) -> AED:
tags = node.get('tags', {})
is_valid = _is_defibrillator(tags)
Expand All @@ -96,6 +99,7 @@ def _process_overpass_node(node: dict) -> AED:
)


@trace
async def _update_db_snapshot() -> None:
print('🩺 Updating aed database (overpass)...')
elements, data_timestamp = await query_overpass(_AED_QUERY, timeout=3600, must_return=True)
Expand Down Expand Up @@ -147,6 +151,7 @@ def _process_delete(node: dict) -> str:
raise NotImplementedError(f'Unknown action type: {action["@type"]}')


@trace
async def _update_db_diffs(last_update: float) -> None:
print('🩺 Updating aed database (diff)...')
actions, data_timestamp = await get_planet_diffs(last_update)
Expand Down Expand Up @@ -181,6 +186,7 @@ async def _update_db_diffs(last_update: float) -> None:


@retry_exponential(None, start=4)
@trace
async def _update_db() -> None:
update_required, update_timestamp = await _should_update_db()
if not update_required:
Expand All @@ -196,7 +202,6 @@ async def _update_db() -> None:

class AEDState:
@staticmethod
@trace
async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn:
if (await _should_update_db())[1] > 0:
task_status.started()
Expand All @@ -205,7 +210,8 @@ async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn:
started = False

while True:
await _update_db()
with start_transaction(op='update_db', name=AEDState.update_db_task.__qualname__):
await _update_db()
if not started:
task_status.started()
started = True
Expand All @@ -216,8 +222,8 @@ async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn:
async def update_country_codes(cls) -> None:
await _assign_country_codes(await cls.get_all_aeds())

@lru_cache(maxsize=1024)
@staticmethod
@cached(TTLCache(maxsize=1024, ttl=3600))
@trace
async def count_aeds_by_country_code(country_code: str) -> int:
return await AED_COLLECTION.count_documents({'country_codes': country_code})
Expand Down
8 changes: 5 additions & 3 deletions states/country_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import anyio
from dacite import from_dict
from sentry_sdk import trace
from sentry_sdk import start_transaction, trace
from shapely.geometry import Point, mapping, shape

from config import COUNTRY_COLLECTION, COUNTRY_UPDATE_DELAY
Expand Down Expand Up @@ -55,6 +55,7 @@ def _get_names(tags: dict[str, str]) -> dict[str, str]:
return names


@trace
async def _should_update_db() -> tuple[bool, float]:
doc = await get_state_doc('country')
if doc is None:
Expand All @@ -69,6 +70,7 @@ async def _should_update_db() -> tuple[bool, float]:


@retry_exponential(None, start=4)
@trace
async def _update_db() -> None:
update_required, update_timestamp = await _should_update_db()
if not update_required:
Expand Down Expand Up @@ -115,7 +117,6 @@ async def _update_db() -> None:

class CountryState:
@staticmethod
@trace
async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn:
if (await _should_update_db())[1] > 0:
task_status.started()
Expand All @@ -124,7 +125,8 @@ async def update_db_task(*, task_status=anyio.TASK_STATUS_IGNORED) -> NoReturn:
started = False

while True:
await _update_db()
with start_transaction(op='update_db', name=CountryState.update_db_task.__qualname__, sampled=True):
await _update_db()
if not started:
task_status.started()
started = True
Expand Down
2 changes: 2 additions & 0 deletions states/photo_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from utils import as_dict


@trace
def _resize_image(img: Image.Image) -> Image.Image:
width, height = img.size

Expand All @@ -24,6 +25,7 @@ def _resize_image(img: Image.Image) -> Image.Image:
return img.resize((new_width, new_height), Image.LANCZOS)


@trace
def _optimize_image(img: Image.Image, format: str = 'WEBP') -> bytes:
high, low = 95, 20
bs_step = 5
Expand Down

0 comments on commit 4267dfd

Please sign in to comment.