Skip to content

Commit

Permalink
Merge pull request #152 from StijnCaerts/es-alias
Browse files Browse the repository at this point in the history
Use Elasticsearch aliases on indices
  • Loading branch information
jonhealy1 authored Nov 7, 2023
2 parents 8fc2d0d + eade0b4 commit 23dca8a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Examples folder with example docker setup for running sfes from pip [#147](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/147)

### Changed

- Use aliases on Elasticsearch indices, add number suffix in index name. [#152](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/152)

### Fixed

- Corrected the closing of client connections in ES index management functions [#132](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/132)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
":",
}

DEFAULT_INDICES = f"*,-*kibana*,-{COLLECTIONS_INDEX}"
ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*"

DEFAULT_SORT = {
"properties.datetime": {"order": "desc"},
Expand Down Expand Up @@ -164,7 +164,7 @@ def indices(collection_ids: Optional[List[str]]) -> str:
A string of comma-separated index names. If `collection_ids` is None, returns the default indices.
"""
if collection_ids is None:
return DEFAULT_INDICES
return ITEM_INDICES
else:
return ",".join([index_by_collection_id(c) for c in collection_ids])

Expand All @@ -178,7 +178,8 @@ async def create_collection_index() -> None:
client = AsyncElasticsearchSettings().create_client

await client.indices.create(
index=COLLECTIONS_INDEX,
index=f"{COLLECTIONS_INDEX}-000001",
aliases={COLLECTIONS_INDEX: {}},
mappings=ES_COLLECTIONS_MAPPINGS,
ignore=400, # ignore 400 already exists code
)
Expand All @@ -197,9 +198,11 @@ async def create_item_index(collection_id: str):
"""
client = AsyncElasticsearchSettings().create_client
index_name = index_by_collection_id(collection_id)

await client.indices.create(
index=index_by_collection_id(collection_id),
index=f"{index_by_collection_id(collection_id)}-000001",
aliases={index_name: {}},
mappings=ES_ITEMS_MAPPINGS,
settings=ES_ITEMS_SETTINGS,
ignore=400, # ignore 400 already exists code
Expand All @@ -215,7 +218,14 @@ async def delete_item_index(collection_id: str):
"""
client = AsyncElasticsearchSettings().create_client

await client.indices.delete(index=index_by_collection_id(collection_id))
name = index_by_collection_id(collection_id)
resolved = await client.indices.resolve_index(name=name)
if "aliases" in resolved and resolved["aliases"]:
[alias] = resolved["aliases"]
await client.indices.delete_alias(index=alias["indices"], name=alias["name"])
await client.indices.delete(index=alias["indices"])
else:
await client.indices.delete(index=name)
await client.close()


Expand Down Expand Up @@ -773,14 +783,11 @@ async def bulk_async(
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
index is refreshed after the bulk insert. The function does not return any value.
"""
await asyncio.get_event_loop().run_in_executor(
None,
lambda: helpers.bulk(
self.sync_client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
),
await helpers.async_bulk(
self.client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
)

def bulk_sync(
Expand Down Expand Up @@ -811,7 +818,7 @@ def bulk_sync(
async def delete_items(self) -> None:
"""Danger. this is only for tests."""
await self.client.delete_by_query(
index=DEFAULT_INDICES,
index=ITEM_INDICES,
body={"query": {"match_all": {}}},
wait_for_completion=True,
)
Expand Down

0 comments on commit 23dca8a

Please sign in to comment.