Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Refactoring] Better encapsulation of endpoint logic into the Endpoin…
Browse files Browse the repository at this point in the history
…tInfo class

This change adds a class method to construct API endpoint paths instead of having callers compute this on their own. It also includes better/normalized handling of slashes in endpoint and path URIs. A minor refactoring of logging in migration_monitor.py is also included, which improves unit test code coverage.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Nov 3, 2023
1 parent 5beb8a4 commit aa2ac94
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 61 deletions.
40 changes: 35 additions & 5 deletions FetchMigration/python/endpoint_info.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster
class EndpointInfo:
url: str
auth: tuple = None
verify_ssl: bool = True
# Private member variables
__url: str
__auth: Optional[tuple]
__verify_ssl: bool

def __init__(self, url: str, auth: tuple = None, verify_ssl: bool = True):
self.__url = url
# Normalize url value to have trailing slash
if not url.endswith("/"):
self.__url += "/"
self.__auth = auth
self.__verify_ssl = verify_ssl

def __eq__(self, obj):
return isinstance(obj, EndpointInfo) and \
self.__url == obj.__url and \
self.__auth == obj.__auth and \
self.__verify_ssl == obj.__verify_ssl

def add_path(self, path: str) -> str:
# Remove leading slash if present
if path.startswith("/"):
path = path[1:]
return self.__url + path

def get_url(self) -> str:
return self.__url

def get_auth(self) -> Optional[tuple]:
return self.__auth

def is_verify_ssl(self) -> bool:
return self.__verify_ssl
14 changes: 8 additions & 6 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@


def fetch_all_indices(endpoint: EndpointInfo) -> dict:
actual_endpoint = endpoint.url + __ALL_INDICES_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT)
resp = requests.get(all_indices_url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
result = dict(resp.json())
for index in list(result.keys()):
# Remove system indices
Expand All @@ -31,19 +31,21 @@ def fetch_all_indices(endpoint: EndpointInfo) -> dict:

def create_indices(indices_data: dict, endpoint: EndpointInfo):
for index in indices_data:
actual_endpoint = endpoint.url + index
index_endpoint = endpoint.add_path(index)
data_dict = dict()
data_dict[SETTINGS_KEY] = indices_data[index][SETTINGS_KEY]
data_dict[MAPPINGS_KEY] = indices_data[index][MAPPINGS_KEY]
try:
resp = requests.put(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl, json=data_dict)
resp = requests.put(index_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=data_dict)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to create index [{index}] - {e!s}")


def doc_count(indices: set, endpoint: EndpointInfo) -> int:
actual_endpoint = endpoint.url + ','.join(indices) + __COUNT_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
count_endpoint_suffix: str = ','.join(indices) + __COUNT_ENDPOINT
doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix)
resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
result = dict(resp.json())
return int(result[COUNT_KEY])
26 changes: 15 additions & 11 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def shutdown_process(proc: Popen) -> Optional[int]:


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
shutdown_endpoint = endpoint.add_path(__SHUTDOWN_API_PATH)
requests.post(shutdown_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __METRICS_API_PATH
metrics_endpoint = endpoint.add_path(__METRICS_API_PATH)
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response = requests.get(metrics_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
response.raise_for_status()
except requests.exceptions.RequestException:
return None
Expand Down Expand Up @@ -95,11 +95,20 @@ def __should_continue_monitoring(metrics: ProgressMetrics, proc: Optional[Popen]
return not metrics.is_in_terminal_state() and (proc is None or is_process_alive(proc))


def __log_migration_end_reason(metrics: ProgressMetrics): # pragma no cover
if metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")


# Last parameter is optional, and signifies a local Data Prepper process
def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.get_target_doc_count()))
while __should_continue_monitoring(progress_metrics, dp_process):
if dp_process is not None:
# Wait on local process
Expand All @@ -120,12 +129,7 @@ def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_i
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
__log_migration_end_reason(progress_metrics)
# Shut down Data Prepper pipeline via API
shutdown_pipeline(endpoint_info)
if dp_process is None:
Expand Down
60 changes: 32 additions & 28 deletions FetchMigration/python/progress_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,54 @@ class ProgressMetrics:
_REC_IN_FLIGHT_KEY = "records_in_flight"
_NO_PART_KEY = "no_partitions"

target_doc_count: int
idle_threshold: int
current_values_map: dict[str, Optional[int]]
prev_values_map: dict[str, Optional[int]]
counter_map: dict[str, int]
# Private member variables
__target_doc_count: int
__idle_threshold: int
__current_values_map: dict[str, Optional[int]]
__prev_values_map: dict[str, Optional[int]]
__counter_map: dict[str, int]

def __init__(self, doc_count, idle_threshold):
self.target_doc_count = doc_count
self.idle_threshold = idle_threshold
self.current_values_map = dict()
self.prev_values_map = dict()
self.counter_map = dict()
self.__target_doc_count = doc_count
self.__idle_threshold = idle_threshold
self.__current_values_map = dict()
self.__prev_values_map = dict()
self.__counter_map = dict()

def get_target_doc_count(self) -> int:
return self.__target_doc_count

def __reset_counter(self, key: str):
if key in self.counter_map:
del self.counter_map[key]
if key in self.__counter_map:
del self.__counter_map[key]

def __increment_counter(self, key: str):
val = self.counter_map.get(key, 0)
self.counter_map[key] = val + 1
val = self.__counter_map.get(key, 0)
self.__counter_map[key] = val + 1

def __get_idle_value_key_name(self, key: str) -> str:
return self.__IDLE_VALUE_PREFIX + key

def __get_idle_value_count(self, key: str) -> Optional[int]:
idle_value_key = self.__get_idle_value_key_name(key)
return self.counter_map.get(idle_value_key)
return self.__counter_map.get(idle_value_key)

def __record_value(self, key: str, val: Optional[int]):
if key in self.current_values_map:
if key in self.__current_values_map:
# Move current value to previous
self.prev_values_map[key] = self.current_values_map[key]
self.__prev_values_map[key] = self.__current_values_map[key]
# Store new value
self.current_values_map[key] = val
self.__current_values_map[key] = val
# Track idle value metrics
idle_value_key = self.__get_idle_value_key_name(key)
if self.prev_values_map[key] == val:
if self.__prev_values_map[key] == val:
self.__increment_counter(idle_value_key)
else:
self.__reset_counter(idle_value_key)
self.current_values_map[key] = val
self.__current_values_map[key] = val

def __get_current_value(self, key: str) -> Optional[int]:
return self.current_values_map.get(key)
return self.__current_values_map.get(key)

def reset_metric_api_failure(self):
self.__reset_counter(self._METRIC_API_FAIL_KEY)
Expand Down Expand Up @@ -89,20 +93,20 @@ def get_doc_completion_percentage(self) -> int:
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return math.floor((success_doc_count * 100) / self.target_doc_count)
return math.floor((success_doc_count * 100) / self.__target_doc_count)

def all_docs_migrated(self) -> bool:
# TODO Add a check for partitionsCompleted = indices
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return success_doc_count >= self.target_doc_count
return success_doc_count >= self.__target_doc_count

def is_migration_complete_success(self) -> bool:
is_idle_pipeline: bool = False
rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY)
no_partitions_count = self.__get_current_value(self._NO_PART_KEY)
prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0)
prev_no_partitions_count = self.__prev_values_map.get(self._NO_PART_KEY, 0)
# Check for no records in flight
if rec_in_flight is not None and rec_in_flight == 0:
# No-partitions metrics should steadily tick up
Expand All @@ -114,15 +118,15 @@ def is_migration_idle(self) -> bool:
keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY]
for key in keys_to_check:
val = self.__get_idle_value_count(key)
if val is not None and val >= self.idle_threshold:
if val is not None and val >= self.__idle_threshold:
logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " +
str(self.idle_threshold))
str(self.__idle_threshold))
return True
# End of loop
return False

def is_too_may_api_failures(self) -> bool:
return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold
return self.__counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.__idle_threshold

def is_in_terminal_state(self) -> bool:
return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures()
Expand All @@ -135,4 +139,4 @@ def log_idle_pipeline_debug_metrics(self): # pragma no cover
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(self.__get_current_value(self._REC_IN_FLIGHT_KEY),
self.__get_current_value(self._NO_PART_KEY),
self.prev_values_map.get(self._NO_PART_KEY)))
self.__prev_values_map.get(self._NO_PART_KEY)))
22 changes: 11 additions & 11 deletions FetchMigration/python/tests/test_metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,26 @@ def test_get_endpoint_info(self):
# Simple base case
test_config = create_plugin_config([host_input])
result = metadata_migration.get_endpoint_info(test_config)
self.assertEqual(expected_endpoint, result.url)
self.assertIsNone(result.auth)
self.assertTrue(result.verify_ssl)
self.assertEqual(expected_endpoint, result.get_url())
self.assertIsNone(result.get_auth())
self.assertTrue(result.is_verify_ssl())
# Invalid auth config
test_config = create_plugin_config([host_input], test_user)
result = metadata_migration.get_endpoint_info(test_config)
self.assertEqual(expected_endpoint, result.url)
self.assertIsNone(result.auth)
self.assertEqual(expected_endpoint, result.get_url())
self.assertIsNone(result.get_auth())
# Valid auth config
test_config = create_plugin_config([host_input], user=test_user, password=test_password)
result = metadata_migration.get_endpoint_info(test_config)
self.assertEqual(expected_endpoint, result.url)
self.assertEqual(test_user, result.auth[0])
self.assertEqual(test_password, result.auth[1])
self.assertEqual(expected_endpoint, result.get_url())
self.assertEqual(test_user, result.get_auth()[0])
self.assertEqual(test_password, result.get_auth()[1])
# Array of hosts uses the first entry
test_config = create_plugin_config([host_input, "other_host"], test_user, test_password)
result = metadata_migration.get_endpoint_info(test_config)
self.assertEqual(expected_endpoint, result.url)
self.assertEqual(test_user, result.auth[0])
self.assertEqual(test_password, result.auth[1])
self.assertEqual(expected_endpoint, result.get_url())
self.assertEqual(test_user, result.get_auth()[0])
self.assertEqual(test_password, result.get_auth()[1])

def test_get_index_differences_empty(self):
# Base case should return an empty list
Expand Down

0 comments on commit aa2ac94

Please sign in to comment.