Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection pagination #164

Merged
merged 15 commits into from
Nov 7, 2023
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
services:

elasticsearch_8_svc:
image: docker.elastic.co/elasticsearch/elasticsearch:8.1.3
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.4
env:
cluster.name: stac-cluster
node.name: es01
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Collection-level Assets to the CollectionSerializer [#148](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/148)
- Pagination for /collections - GET all collections - route [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164)

### Added

- Examples folder with example docker setup for running sfes from pip [#147](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/147)

### Changed

- Update elasticsearch version from 8.1.3 to 8.10.4 in cicd, gh actions [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164)

### Fixed

- Corrected the closing of client connections in ES index management functions [#132](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/132)
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,17 @@ curl -X "POST" "http://localhost:8080/collections" \
```

Note: this "Collections Transaction" behavior is not part of the STAC API, but may be soon.


## Collection pagination

The collections route handles optional `limit` and `token` parameters. The `links` field that is
returned from the `/collections` route contains a `next` link with the token that can be used to
get the next page of results.

```shell
curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token"
```

## Testing

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:

elasticsearch:
container_name: es-container
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.1.3}
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.10.4}
environment:
ES_JAVA_OPTS: -Xms512m -Xmx1g
volumes:
Expand Down
67 changes: 48 additions & 19 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Item crud client."""
import json
import logging
from base64 import urlsafe_b64encode
from datetime import datetime as datetime_type
from datetime import timezone
from typing import Any, Dict, List, Optional, Set, Type, Union
Expand Down Expand Up @@ -80,30 +81,58 @@ async def all_collections(self, **kwargs) -> Collections:
Raises:
Exception: If any error occurs while reading the collections from the database.
"""
request: Request = kwargs["request"]
base_url = str(kwargs["request"].base_url)

limit = (
int(request.query_params["limit"])
if "limit" in request.query_params
else 10
)
token = (
request.query_params["token"] if "token" in request.query_params else None
)

hits = await self.database.get_all_collections(limit=limit, token=token)

next_search_after = None
next_link = None
if len(hits) == limit:
last_hit = hits[-1]
next_search_after = last_hit["sort"]
next_token = urlsafe_b64encode(
",".join(map(str, next_search_after)).encode()
).decode()
paging_links = PagingLinks(next=next_token, request=request)
next_link = paging_links.link_next()

links = [
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
]

if next_link:
links.append(next_link)

return Collections(
collections=[
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in await self.database.get_all_collections()
],
links=[
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
self.collection_serializer.db_to_stac(c["_source"], base_url=base_url)
for c in hits
],
links=links,
)

@overrides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,21 +295,34 @@ class DatabaseLogic:

"""CORE LOGIC"""

async def get_all_collections(self) -> Iterable[Dict[str, Any]]:
async def get_all_collections(
self, token: Optional[str], limit: int
) -> Iterable[Dict[str, Any]]:
"""Retrieve a list of all collections from the database.

Args:
token (Optional[str]): The token used to return the next set of results.
limit (int): Number of results to return

Returns:
collections (Iterable[Dict[str, Any]]): A list of dictionaries containing the source data for each collection.

Notes:
The collections are retrieved from the Elasticsearch database using the `client.search` method,
with the `COLLECTIONS_INDEX` as the target index and `size=1000` to retrieve up to 1000 records.
with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records.
The result is a generator of dictionaries containing the source data for each collection.
"""
# https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/65
# collections should be paginated, but at least return more than the default 10 for now
collections = await self.client.search(index=COLLECTIONS_INDEX, size=1000)
return (c["_source"] for c in collections["hits"]["hits"])
search_after = None
if token:
search_after = urlsafe_b64decode(token.encode()).decode().split(",")
collections = await self.client.search(
index=COLLECTIONS_INDEX,
search_after=search_after,
size=limit,
sort={"id": {"order": "asc"}},
)
hits = collections["hits"]["hits"]
return hits

async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
"""Retrieve a single item from the database.
Expand Down
26 changes: 26 additions & 0 deletions stac_fastapi/elasticsearch/tests/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import uuid
from datetime import datetime, timedelta

import pytest

from ..conftest import create_collection, create_item

ROUTES = {
Expand Down Expand Up @@ -31,17 +33,20 @@
}


@pytest.mark.asyncio
async def test_post_search_content_type(app_client, ctx):
params = {"limit": 1}
resp = await app_client.post("/search", json=params)
assert resp.headers["content-type"] == "application/geo+json"


@pytest.mark.asyncio
async def test_get_search_content_type(app_client, ctx):
resp = await app_client.get("/search")
assert resp.headers["content-type"] == "application/geo+json"


@pytest.mark.asyncio
async def test_api_headers(app_client):
resp = await app_client.get("/api")
assert (
Expand All @@ -50,11 +55,13 @@ async def test_api_headers(app_client):
assert resp.status_code == 200


@pytest.mark.asyncio
async def test_router(app):
api_routes = set([f"{list(route.methods)[0]} {route.path}" for route in app.routes])
assert len(api_routes - ROUTES) == 0


@pytest.mark.asyncio
async def test_app_transaction_extension(app_client, ctx):
item = copy.deepcopy(ctx.item)
item["id"] = str(uuid.uuid4())
Expand All @@ -64,6 +71,7 @@ async def test_app_transaction_extension(app_client, ctx):
await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}")


@pytest.mark.asyncio
async def test_app_search_response(app_client, ctx):
resp = await app_client.get("/search", params={"ids": ["test-item"]})
assert resp.status_code == 200
Expand All @@ -75,6 +83,7 @@ async def test_app_search_response(app_client, ctx):
assert resp_json.get("stac_extensions") is None


@pytest.mark.asyncio
async def test_app_context_extension(app_client, ctx, txn_client):
test_item = ctx.item
test_item["id"] = "test-item-2"
Expand Down Expand Up @@ -108,13 +117,15 @@ async def test_app_context_extension(app_client, ctx, txn_client):
assert matched == 1


@pytest.mark.asyncio
async def test_app_fields_extension(app_client, ctx, txn_client):
resp = await app_client.get("/search", params={"collections": ["test-collection"]})
assert resp.status_code == 200
resp_json = resp.json()
assert list(resp_json["features"][0]["properties"]) == ["datetime"]


@pytest.mark.asyncio
async def test_app_fields_extension_query(app_client, ctx, txn_client):
resp = await app_client.post(
"/search",
Expand All @@ -128,6 +139,7 @@ async def test_app_fields_extension_query(app_client, ctx, txn_client):
assert list(resp_json["features"][0]["properties"]) == ["datetime", "proj:epsg"]


@pytest.mark.asyncio
async def test_app_fields_extension_no_properties_get(app_client, ctx, txn_client):
resp = await app_client.get(
"/search", params={"collections": ["test-collection"], "fields": "-properties"}
Expand All @@ -137,6 +149,7 @@ async def test_app_fields_extension_no_properties_get(app_client, ctx, txn_clien
assert "properties" not in resp_json["features"][0]


@pytest.mark.asyncio
async def test_app_fields_extension_no_properties_post(app_client, ctx, txn_client):
resp = await app_client.post(
"/search",
Expand All @@ -150,6 +163,7 @@ async def test_app_fields_extension_no_properties_post(app_client, ctx, txn_clie
assert "properties" not in resp_json["features"][0]


@pytest.mark.asyncio
async def test_app_fields_extension_return_all_properties(app_client, ctx, txn_client):
item = ctx.item
resp = await app_client.get(
Expand All @@ -166,6 +180,7 @@ async def test_app_fields_extension_return_all_properties(app_client, ctx, txn_c
assert feature["properties"][expected_prop] == expected_value


@pytest.mark.asyncio
async def test_app_query_extension_gt(app_client, ctx):
params = {"query": {"proj:epsg": {"gt": ctx.item["properties"]["proj:epsg"]}}}
resp = await app_client.post("/search", json=params)
Expand All @@ -174,6 +189,7 @@ async def test_app_query_extension_gt(app_client, ctx):
assert len(resp_json["features"]) == 0


@pytest.mark.asyncio
async def test_app_query_extension_gte(app_client, ctx):
params = {"query": {"proj:epsg": {"gte": ctx.item["properties"]["proj:epsg"]}}}
resp = await app_client.post("/search", json=params)
Expand All @@ -182,22 +198,26 @@ async def test_app_query_extension_gte(app_client, ctx):
assert len(resp.json()["features"]) == 1


@pytest.mark.asyncio
async def test_app_query_extension_limit_lt0(app_client):
assert (await app_client.post("/search", json={"limit": -1})).status_code == 400


@pytest.mark.asyncio
async def test_app_query_extension_limit_gt10000(app_client):
resp = await app_client.post("/search", json={"limit": 10001})
assert resp.status_code == 200
assert resp.json()["context"]["limit"] == 10000


@pytest.mark.asyncio
async def test_app_query_extension_limit_10000(app_client):
params = {"limit": 10000}
resp = await app_client.post("/search", json=params)
assert resp.status_code == 200


@pytest.mark.asyncio
async def test_app_sort_extension(app_client, txn_client, ctx):
first_item = ctx.item
item_date = datetime.strptime(
Expand All @@ -223,6 +243,7 @@ async def test_app_sort_extension(app_client, txn_client, ctx):
assert resp_json["features"][1]["id"] == second_item["id"]


@pytest.mark.asyncio
async def test_search_invalid_date(app_client, ctx):
params = {
"datetime": "2020-XX-01/2020-10-30",
Expand All @@ -233,6 +254,7 @@ async def test_search_invalid_date(app_client, ctx):
assert resp.status_code == 400


@pytest.mark.asyncio
async def test_search_point_intersects(app_client, ctx):
point = [150.04, -33.14]
intersects = {"type": "Point", "coordinates": point}
Expand All @@ -248,6 +270,7 @@ async def test_search_point_intersects(app_client, ctx):
assert len(resp_json["features"]) == 1


@pytest.mark.asyncio
async def test_search_point_does_not_intersect(app_client, ctx):
point = [15.04, -3.14]
intersects = {"type": "Point", "coordinates": point}
Expand All @@ -263,6 +286,7 @@ async def test_search_point_does_not_intersect(app_client, ctx):
assert len(resp_json["features"]) == 0


@pytest.mark.asyncio
async def test_datetime_non_interval(app_client, ctx):
dt_formats = [
"2020-02-12T12:30:22+00:00",
Expand All @@ -284,6 +308,7 @@ async def test_datetime_non_interval(app_client, ctx):
assert resp_json["features"][0]["properties"]["datetime"][0:19] == dt[0:19]


@pytest.mark.asyncio
async def test_bbox_3d(app_client, ctx):
australia_bbox = [106.343365, -47.199523, 0.1, 168.218365, -19.437288, 0.1]
params = {
Expand All @@ -296,6 +321,7 @@ async def test_bbox_3d(app_client, ctx):
assert len(resp_json["features"]) == 1


@pytest.mark.asyncio
async def test_search_line_string_intersects(app_client, ctx):
line = [[150.04, -33.14], [150.22, -33.89]]
intersects = {"type": "LineString", "coordinates": line}
Expand Down
Loading