Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request opensearch-project#434 from kartg/fetch-timeout-ha…
Browse files Browse the repository at this point in the history
…ndling

[Fetch Migration] Improved request error handling
  • Loading branch information
sumobrian authored Nov 16, 2023
2 parents e76841e + 7dd6f66 commit 1d1375c
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 48 deletions.
1 change: 0 additions & 1 deletion FetchMigration/python/endpoint_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def validate_auth(plugin_name: str, plugin_config: dict):
if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config:
# Raises a ValueError if region cannot be derived
get_aws_region(plugin_config)
return
# Validate basic auth
elif USER_KEY not in plugin_config:
raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name)
Expand Down
12 changes: 9 additions & 3 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
arg_parser.add_argument("--createonly", "-c", action="store_true",
help="Skips data migration and only creates indices on the target cluster")
cli_args = arg_parser.parse_args()
params = FetchOrchestratorParams(os.path.expandvars(cli_args.data_prepper_path),
os.path.expandvars(cli_args.pipeline_file_path),
port=cli_args.port, insecure=cli_args.insecure,
dp_path = os.path.expandvars(cli_args.data_prepper_path)
if not os.path.isdir(dp_path):
raise ValueError("Path to Data Prepper installation is not a directory")
elif not os.path.exists(dp_path + __DP_EXECUTABLE_SUFFIX):
raise ValueError(f"Could not find {__DP_EXECUTABLE_SUFFIX} executable under Data Prepper install location")
pipeline_file = os.path.expandvars(cli_args.pipeline_file_path)
if not os.path.exists(pipeline_file):
raise ValueError("Data Prepper pipeline file does not exist")
params = FetchOrchestratorParams(dp_path, pipeline_file, port=cli_args.port, insecure=cli_args.insecure,
dryrun=cli_args.dryrun, create_only=cli_args.createonly)
return_code = run(params)
if return_code == 0:
Expand Down
84 changes: 56 additions & 28 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
# compatible open source license.
#

from typing import Optional

import jsonpath_ng
import requests

from endpoint_info import EndpointInfo

# Constants
from index_doc_count import IndexDocCount

# Constants
SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
ALIASES_KEY = "aliases"
Expand All @@ -28,26 +29,49 @@
__BUCKET_INDEX_NAME_KEY = "key"
__BUCKET_DOC_COUNT_KEY = "doc_count"
__INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"]
__TIMEOUT_SECONDS = 10


def __send_get_request(url: str, endpoint: EndpointInfo, payload: Optional[dict] = None) -> requests.Response:
try:
resp = requests.get(url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), json=payload,
timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
return resp
except requests.ConnectionError:
raise RuntimeError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}")
except requests.HTTPError as e:
raise RuntimeError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()} - {e!s}")
except requests.Timeout:
# TODO retry mechanism
raise RuntimeError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"GET request failure to cluster endpoint: {endpoint.get_url()} - {e!s}")


def fetch_all_indices(endpoint: EndpointInfo) -> dict:
all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT)
resp = requests.get(all_indices_url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
result = dict(resp.json())
for index in list(result.keys()):
# Remove system indices
if index.startswith("."):
del result[index]
# Remove internal settings
else:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result
try:
# raises RuntimeError in case of any request errors
resp = __send_get_request(all_indices_url, endpoint)
result = dict(resp.json())
for index in list(result.keys()):
# Remove system indices
if index.startswith("."):
del result[index]
# Remove internal settings
else:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch metadata from cluster endpoint: {e!s}")


def create_indices(indices_data: dict, endpoint: EndpointInfo):
def create_indices(indices_data: dict, endpoint: EndpointInfo) -> dict:
failed_indices = dict()
for index in indices_data:
index_endpoint = endpoint.add_path(index)
data_dict = dict()
Expand All @@ -57,22 +81,26 @@ def create_indices(indices_data: dict, endpoint: EndpointInfo):
data_dict[ALIASES_KEY] = indices_data[index][ALIASES_KEY]
try:
resp = requests.put(index_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=data_dict)
json=data_dict, timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to create index [{index}] - {e!s}")
failed_indices[index] = e
# Loop completed, return failures if any
return failed_indices


def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH
doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix)
resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=__SEARCH_COUNT_PAYLOAD)
# TODO Handle resp.status_code for non successful requests
result = dict(resp.json())
total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value
counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value
count_map = dict()
for entry in counts_list:
count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY]
return IndexDocCount(total, count_map)
try:
# raises RuntimeError in case of any request errors
resp = __send_get_request(doc_count_endpoint, endpoint, __SEARCH_COUNT_PAYLOAD)
result = dict(resp.json())
total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value
counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value
count_map = dict()
for entry in counts_list:
count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY]
return IndexDocCount(total, count_map)
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch doc_count: {e!s}")
8 changes: 7 additions & 1 deletion FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
index_operations.create_indices(index_data, target_endpoint_info)
failed_indices = index_operations.create_indices(index_data, target_endpoint_info)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
return result


Expand Down
7 changes: 5 additions & 2 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired"
__IDLE_THRESHOLD: int = 5
__TIMEOUT_SECONDS: int = 5


def is_process_alive(proc: Popen) -> bool:
Expand All @@ -51,13 +52,15 @@ def shutdown_process(proc: Popen) -> Optional[int]:

def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.add_path(__SHUTDOWN_API_PATH)
requests.post(shutdown_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
requests.post(shutdown_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
timeout=__TIMEOUT_SECONDS)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.add_path(__METRICS_API_PATH)
try:
response = requests.get(metrics_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
response = requests.get(metrics_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
timeout=__TIMEOUT_SECONDS)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
Expand Down
49 changes: 38 additions & 11 deletions FetchMigration/python/tests/test_index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def test_create_indices(self):
match=[matchers.json_params_matcher(test_data[test_constants.INDEX2_NAME])])
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX3_NAME,
match=[matchers.json_params_matcher(test_data[test_constants.INDEX3_NAME])])
index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
self.assertEqual(0, len(failed))

@responses.activate
def test_create_indices_empty_alias(self):
Expand All @@ -67,21 +68,28 @@ def test_create_indices_empty_alias(self):
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX1_NAME,
match=[matchers.json_params_matcher(expected_payload)])
# Empty "aliases" should be stripped
index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
self.assertEqual(0, len(failed))
# Index without "aliases" should not fail
del test_data[test_constants.INDEX1_NAME][aliases_key]
index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT))
self.assertEqual(0, len(failed))

@responses.activate
def test_create_indices_exception(self):
# Set up expected PUT calls with a mock response status
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
del test_data[test_constants.INDEX2_NAME]
del test_data[test_constants.INDEX3_NAME]
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX1_NAME,
def test_create_indices_exceptions(self):
# Set up second index to hit an exception
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX2_NAME,
body=requests.Timeout())
self.assertRaises(RuntimeError, index_operations.create_indices, test_data,
EndpointInfo(test_constants.TARGET_ENDPOINT))
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX1_NAME,
json={})
responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX3_NAME,
json={})
failed_indices = index_operations.create_indices(test_constants.BASE_INDICES_DATA,
EndpointInfo(test_constants.TARGET_ENDPOINT))
# Verify that failed indices are returned with their respective errors
self.assertEqual(1, len(failed_indices))
self.assertTrue(test_constants.INDEX2_NAME in failed_indices)
self.assertTrue(isinstance(failed_indices[test_constants.INDEX2_NAME], requests.Timeout))

@responses.activate
def test_doc_count(self):
Expand All @@ -99,6 +107,25 @@ def test_doc_count(self):
doc_count_result = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(total_docs, doc_count_result.total)

@responses.activate
def test_doc_count_error(self):
test_indices = {test_constants.INDEX1_NAME, test_constants.INDEX2_NAME}
expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_search?size=0"
responses.get(expected_count_endpoint, body=requests.Timeout())
self.assertRaises(RuntimeError, index_operations.doc_count, test_indices,
EndpointInfo(test_constants.SOURCE_ENDPOINT))

@responses.activate
def test_get_request_errors(self):
# Set up list of error types to test
test_errors = [requests.ConnectionError(), requests.HTTPError(), requests.Timeout(),
requests.exceptions.MissingSchema()]
# Verify that each error is wrapped in a RuntimeError
for e in test_errors:
responses.get(test_constants.SOURCE_ENDPOINT + "*", body=e)
self.assertRaises(RuntimeError, index_operations.fetch_all_indices,
EndpointInfo(test_constants.SOURCE_ENDPOINT))


if __name__ == '__main__':
unittest.main()
28 changes: 28 additions & 0 deletions FetchMigration/python/tests/test_metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
#

import copy
import logging
import pickle
import unittest
from unittest.mock import patch, MagicMock, ANY

import requests

import metadata_migration
from index_doc_count import IndexDocCount
from metadata_migration_params import MetadataMigrationParams
Expand All @@ -21,9 +24,13 @@
class TestMetadataMigration(unittest.TestCase):
# Run before each test
def setUp(self) -> None:
logging.disable(logging.CRITICAL)
with open(test_constants.PIPELINE_CONFIG_PICKLE_FILE_PATH, "rb") as f:
self.loaded_pipeline_config = pickle.load(f)

def tearDown(self) -> None:
logging.disable(logging.NOTSET)

@patch('index_operations.doc_count')
@patch('metadata_migration.write_output')
@patch('metadata_migration.print_report')
Expand Down Expand Up @@ -145,6 +152,27 @@ def test_no_indices_in_source(self, mock_fetch_indices: MagicMock):
self.assertEqual(0, test_result.target_doc_count)
self.assertEqual(0, len(test_result.migration_indices))

@patch('metadata_migration.write_output')
@patch('index_operations.doc_count')
@patch('index_operations.create_indices')
@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_failed_indices(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock,
mock_doc_count: MagicMock, mock_write_output: MagicMock):
mock_doc_count.return_value = IndexDocCount(1, dict())
# Setup failed indices
test_failed_indices_result = {
test_constants.INDEX1_NAME: requests.Timeout(),
test_constants.INDEX2_NAME: requests.ConnectionError(),
test_constants.INDEX3_NAME: requests.Timeout()
}
mock_create_indices.return_value = test_failed_indices_result
# Fetch indices is called first for source, then for target
mock_fetch_indices.side_effect = [test_constants.BASE_INDICES_DATA, {}]
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy")
self.assertRaises(RuntimeError, metadata_migration.run, test_input)
mock_create_indices.assert_called_once_with(test_constants.BASE_INDICES_DATA, ANY)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions FetchMigration/python/tests/test_migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_shutdown_pipeline(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
migration_monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG, timeout=ANY)

@patch('requests.get')
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
Expand All @@ -57,7 +57,7 @@ def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = migration_monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
mock_get.assert_called_once_with(expected_url, auth=None, verify=True, timeout=ANY)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
self.assertEqual(TEST_METRIC_NAME, test_metric.name)
Expand Down

0 comments on commit 1d1375c

Please sign in to comment.