Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch integration improvements #139

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4964b7b
opensearch improvements
filipecosta90 May 16, 2024
fbe689e
Fixes per PR linter
filipecosta90 May 16, 2024
91d3dd0
Fixes per ruff linter
filipecosta90 May 16, 2024
acb11f1
Increase the vector limit to 16K given the latest docs
filipecosta90 May 17, 2024
e05c145
Removed the dotproduct incompatibility error given opensearch now sup…
filipecosta90 May 17, 2024
c82c5e9
Added source for IncompatibilityError on vector size
filipecosta90 May 17, 2024
d98196d
Only using basic_auth when we have opensearch login data (this allows…
filipecosta90 May 17, 2024
3fb15b9
Only using basic_auth when we have opensearch login data (this allows…
filipecosta90 May 17, 2024
0555aa4
Detecting ssl features from url on opensearch client
filipecosta90 May 17, 2024
fa3eb76
Fixed OpenSearch connection setup
filipecosta90 May 17, 2024
8ea777a
Waiting for yellow status at least on opensearch post upload stage
filipecosta90 May 17, 2024
aec4967
Fixes per PR pre-commit: isort
filipecosta90 May 17, 2024
e8b5764
Fixed forcemerge api usage on opensearch
filipecosta90 May 17, 2024
e500000
Renamed references to ES
filipecosta90 May 17, 2024
4f5937a
Added backoff strategy for search_one method on opensearch client
filipecosta90 May 17, 2024
f83bc75
Fixes per PR pre-commit: isort
filipecosta90 May 17, 2024
abd8637
Added backoff strategy for search_one method on opensearch client
filipecosta90 May 17, 2024
ae5b620
Improved index and search performance based uppon docs recommendation
filipecosta90 May 19, 2024
5679a19
Collecting index stats at end of ingestion
filipecosta90 May 19, 2024
276892c
Using backoff on opensearch ingestion
filipecosta90 May 19, 2024
2c54762
Included single shard experiment for opensearch. Added backoff to pos…
filipecosta90 May 20, 2024
7292e92
Fixes per PR pre-commit: isort
filipecosta90 May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions engine/clients/opensearch/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,60 @@
OPENSEARCH_PORT = 9200
OPENSEARCH_INDEX = "bench"
OPENSEARCH_USER = "opensearch"
OPENSEARCH_PASSWORD = "passwd"
import os
import time

from opensearchpy import OpenSearch

OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200))
OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "bench")
OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd")
OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300))
OPENSEARCH_BULK_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_BULK_INDEX_TIMEOUT", 3600))
OPENSEARCH_FULL_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_FULL_INDEX_TIMEOUT", 3600))
OPENSEARCH_DELETE_INDEX_TIMEOUT = int(
os.getenv("OPENSEARCH_DELETE_INDEX_TIMEOUT", 1200)
)


def get_opensearch_client(host, connection_params):
init_params = {
**{
"verify_certs": False,
"request_timeout": OPENSEARCH_TIMEOUT,
"retry_on_timeout": True,
# don't show warnings about ssl certs verification
"ssl_show_warn": False,
},
**connection_params,
}
# Enabling basic auth on opensearch client
# If the user and password are empty we use anonymous auth on opensearch client
if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "":
init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD)
if host.startswith("https"):
init_params["use_ssl"] = True
else:
init_params["use_ssl"] = False
if host.startswith("http"):
url = ""
else:
url = "http://"
url += f"{host}:{OPENSEARCH_PORT}"
client = OpenSearch(
url,
**init_params,
)
assert client.ping()
return client


def _wait_for_es_status(client, status="yellow"):
print(f"waiting for OpenSearch cluster health {status} status...")
for _ in range(100):
try:
client.cluster.health(wait_for_status=status)
return client
except ConnectionError:
time.sleep(0.1)
else:
# timeout
raise Exception("OpenSearch failed to start.")
49 changes: 25 additions & 24 deletions engine/clients/opensearch/configure.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from opensearchpy import NotFoundError, OpenSearch
from opensearchpy import NotFoundError

from benchmark.dataset import Dataset
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.opensearch.config import (
OPENSEARCH_DELETE_INDEX_TIMEOUT,
OPENSEARCH_INDEX,
OPENSEARCH_PASSWORD,
OPENSEARCH_PORT,
OPENSEARCH_USER,
get_opensearch_client,
)


class OpenSearchConfigurator(BaseConfigurator):
DISTANCE_MAPPING = {
Distance.L2: "l2",
Distance.COSINE: "cosinesimil",
# innerproduct (supported for Lucene in OpenSearch version 2.13 and later)
Distance.DOT: "innerproduct",
}
INDEX_TYPE_MAPPING = {
Expand All @@ -25,44 +25,45 @@ class OpenSearchConfigurator(BaseConfigurator):

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
self.client = OpenSearch(
f"http://{host}:{OPENSEARCH_PORT}",
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
**init_params,
)
self.client = get_opensearch_client(host, connection_params)

def clean(self):
try:
self.client.indices.delete(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
"timeout": OPENSEARCH_DELETE_INDEX_TIMEOUT,
},
)
except NotFoundError:
pass

def recreate(self, dataset: Dataset, collection_params):
if dataset.config.distance == Distance.DOT:
raise IncompatibilityError
if dataset.config.vector_size > 1024:
# The knn_vector data type supports a vector of floats that can have a dimension count of up to 16,000 for the NMSLIB, Faiss, and Lucene engines, as set by the dimension mapping parameter.
# Source: https://opensearch.org/docs/latest/search-plugins/knn/approximate-knn/
if dataset.config.vector_size > 16000:
raise IncompatibilityError

index_settings = (
{
"knn": True,
"number_of_replicas": 0,
"refresh_interval": -1, # no refresh is required because we index all the data at once
},
)
index_config = collection_params.get("index")

# if we specify the number_of_shards on the config, enforce it. otherwise use the default
if "number_of_shards" in index_config:
index_settings["number_of_shards"] = 1
Comment on lines +57 to +58

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Tuples don't support item assignment"

I suggest:

    index_settings = {
        "knn": True,
        "number_of_replicas": 0,
        "refresh_interval": -1,  # no refresh is required because we index all the data at once
    }
    index_config = collection_params.get("index")

    # if we specify the number_of_shards on the config, enforce it. otherwise use the default
    if "number_of_shards" in index_config:
        index_settings["number_of_shards"] = 1


# Followed the bellow link for tuning for ingestion and querying
# https://opensearch.org/docs/1.1/search-plugins/knn/performance-tuning/#indexing-performance-tuning
self.client.indices.create(
index=OPENSEARCH_INDEX,
body={
"settings": {
"index": {
"knn": True,
}
"index": index_settings,
},
"mappings": {
"properties": {
Expand Down
34 changes: 17 additions & 17 deletions engine/clients/opensearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import uuid
from typing import List, Tuple

import backoff
from opensearchpy import OpenSearch
from opensearchpy.exceptions import TransportError

from dataset_reader.base_reader import Query
from engine.base_client.search import BaseSearcher
from engine.clients.opensearch.config import (
OPENSEARCH_INDEX,
OPENSEARCH_PASSWORD,
OPENSEARCH_PORT,
OPENSEARCH_USER,
OPENSEARCH_TIMEOUT,
get_opensearch_client,
)
from engine.clients.opensearch.parser import OpenSearchConditionParser

Expand All @@ -31,22 +32,21 @@ def get_mp_start_method(cls):

@classmethod
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
cls.client: OpenSearch = OpenSearch(
f"http://{host}:{OPENSEARCH_PORT}",
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
**init_params,
)
cls.client = get_opensearch_client(host, connection_params)
cls.search_params = search_params

def _search_backoff_handler(details):
print(
f"Backing off OpenSearch query for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}"
)

@classmethod
@backoff.on_exception(
backoff.expo,
TransportError,
max_time=OPENSEARCH_TIMEOUT,
on_backoff=_search_backoff_handler,
)
def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
opensearch_query = {
"knn": {
Expand All @@ -73,7 +73,7 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
"size": top,
},
params={
"timeout": 60,
"timeout": OPENSEARCH_TIMEOUT,
},
)
return [
Expand Down
67 changes: 46 additions & 21 deletions engine/clients/opensearch/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
import uuid
from typing import List

import backoff
from opensearchpy import OpenSearch
from opensearchpy.exceptions import TransportError

from dataset_reader.base_reader import Record
from engine.base_client.upload import BaseUploader
from engine.clients.opensearch.config import (
OPENSEARCH_BULK_INDEX_TIMEOUT,
OPENSEARCH_FULL_INDEX_TIMEOUT,
OPENSEARCH_INDEX,
OPENSEARCH_PASSWORD,
OPENSEARCH_PORT,
OPENSEARCH_USER,
_wait_for_es_status,
get_opensearch_client,
)


Expand All @@ -29,22 +32,26 @@ def get_mp_start_method(cls):

@classmethod
def init_client(cls, host, distance, connection_params, upload_params):
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
cls.client = OpenSearch(
f"http://{host}:{OPENSEARCH_PORT}",
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
**init_params,
)
cls.client = get_opensearch_client(host, connection_params)
cls.upload_params = upload_params

def _upload_backoff_handler(details):
print(
f"Backing off OpenSearch bulk upload for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}"
)

def _index_backoff_handler(details):
print(
f"Backing off OpenSearch indexing for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}"
)

@classmethod
@backoff.on_exception(
backoff.expo,
TransportError,
max_time=OPENSEARCH_FULL_INDEX_TIMEOUT,
on_backoff=_upload_backoff_handler,
)
def upload_batch(cls, batch: List[Record]):
operations = []
for record in batch:
Expand All @@ -56,16 +63,34 @@ def upload_batch(cls, batch: List[Record]):
index=OPENSEARCH_INDEX,
body=operations,
params={
"timeout": 300,
"timeout": OPENSEARCH_BULK_INDEX_TIMEOUT,
},
)

@classmethod
@backoff.on_exception(
backoff.expo,
TransportError,
max_time=OPENSEARCH_FULL_INDEX_TIMEOUT,
on_backoff=_index_backoff_handler,
)
def post_upload(cls, _distance):
cls.client.indices.forcemerge(
print(
"Updated the index settings back to the default and waiting for indexing to be completed."
)
# Update the index settings back to the default
refresh_interval = "1s"
cls.client.indices.put_settings(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
body={"index": {"refresh_interval": refresh_interval}},
Comment on lines +81 to +85

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cls.client.indices.refresh(index=OPENSEARCH_INDEX) better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe it's best as is, meaning:

  • we disable refresh during indexing
  • we enable it back after it

)
_wait_for_es_status(cls.client)
return {}

def get_memory_usage(cls):
index_stats = cls.client.indices.stats(index=OPENSEARCH_INDEX)
size_in_bytes = index_stats["_all"]["primaries"]["store"]["size_in_bytes"]
return {
"size_in_bytes": size_in_bytes,
"index_info": index_stats,
}
Loading