diff --git a/FetchMigration/python/endpoint_info.py b/FetchMigration/python/endpoint_info.py index b77880dc0..a9cce2b13 100644 --- a/FetchMigration/python/endpoint_info.py +++ b/FetchMigration/python/endpoint_info.py @@ -1,8 +1,38 @@ -from dataclasses import dataclass +from typing import Optional -@dataclass +# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster class EndpointInfo: - url: str - auth: tuple = None - verify_ssl: bool = True + # Private member variables + __url: str + __auth: Optional[tuple] + __verify_ssl: bool + + def __init__(self, url: str, auth: tuple = None, verify_ssl: bool = True): + self.__url = url + # Normalize url value to have trailing slash + if not url.endswith("/"): + self.__url += "/" + self.__auth = auth + self.__verify_ssl = verify_ssl + + def __eq__(self, obj): + return isinstance(obj, EndpointInfo) and \ + self.__url == obj.__url and \ + self.__auth == obj.__auth and \ + self.__verify_ssl == obj.__verify_ssl + + def add_path(self, path: str) -> str: + # Remove leading slash if present + if path.startswith("/"): + path = path[1:] + return self.__url + path + + def get_url(self) -> str: + return self.__url + + def get_auth(self) -> Optional[tuple]: + return self.__auth + + def is_verify_ssl(self) -> bool: + return self.__verify_ssl diff --git a/FetchMigration/python/index_operations.py b/FetchMigration/python/index_operations.py index e273eea70..a311ec7e6 100644 --- a/FetchMigration/python/index_operations.py +++ b/FetchMigration/python/index_operations.py @@ -13,8 +13,8 @@ def fetch_all_indices(endpoint: EndpointInfo) -> dict: - actual_endpoint = endpoint.url + __ALL_INDICES_ENDPOINT - resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT) + resp = requests.get(all_indices_url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl()) result = dict(resp.json()) for index in list(result.keys()): # Remove system indices @@ -31,19 +31,21 @@ def fetch_all_indices(endpoint: EndpointInfo) -> dict: def create_indices(indices_data: dict, endpoint: EndpointInfo): for index in indices_data: - actual_endpoint = endpoint.url + index + index_endpoint = endpoint.add_path(index) data_dict = dict() data_dict[SETTINGS_KEY] = indices_data[index][SETTINGS_KEY] data_dict[MAPPINGS_KEY] = indices_data[index][MAPPINGS_KEY] try: - resp = requests.put(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl, json=data_dict) + resp = requests.put(index_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), + json=data_dict) resp.raise_for_status() except requests.exceptions.RequestException as e: raise RuntimeError(f"Failed to create index [{index}] - {e!s}") def doc_count(indices: set, endpoint: EndpointInfo) -> int: - actual_endpoint = endpoint.url + ','.join(indices) + __COUNT_ENDPOINT - resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + count_endpoint_suffix: str = ','.join(indices) + __COUNT_ENDPOINT + doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix) + resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl()) result = dict(resp.json()) return int(result[COUNT_KEY]) diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index 9e62ce40f..d4fec39d8 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -41,14 +41,14 @@ def shutdown_process(proc: Popen) -> Optional[int]: def shutdown_pipeline(endpoint: EndpointInfo): - shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH - requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + shutdown_endpoint = endpoint.add_path(__SHUTDOWN_API_PATH) + requests.post(shutdown_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl()) def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]: - metrics_endpoint = endpoint.url + __METRICS_API_PATH + metrics_endpoint = endpoint.add_path(__METRICS_API_PATH) try: - response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) + response = requests.get(metrics_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl()) response.raise_for_status() except requests.exceptions.RequestException: return None @@ -95,11 +95,20 @@ def __should_continue_monitoring(metrics: ProgressMetrics, proc: Optional[Popen] return not metrics.is_in_terminal_state() and (proc is None or is_process_alive(proc)) +def __log_migration_end_reason(metrics: ProgressMetrics): # pragma no cover + if metrics.is_migration_complete_success(): + logging.info("Migration monitor observed successful migration, shutting down...\n") + elif metrics.is_migration_idle(): + logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") + elif metrics.is_too_may_api_failures(): + logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") + + # Last parameter is optional, and signifies a local Data Prepper process def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int: endpoint_info = EndpointInfo(args.data_prepper_endpoint) progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) - logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) + logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.get_target_doc_count())) while __should_continue_monitoring(progress_metrics, dp_process): if dp_process is not None: # Wait on local process @@ -120,12 +129,7 @@ def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_i logging.error("Please delete any partially migrated indices before retrying the migration.") return dp_process.returncode else: - if progress_metrics.is_migration_complete_success(): - logging.info("Migration monitor observed successful migration, shutting down...\n") - elif progress_metrics.is_migration_idle(): - logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") - elif progress_metrics.is_too_may_api_failures(): - logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") + __log_migration_end_reason(progress_metrics) # Shut down Data Prepper pipeline via API shutdown_pipeline(endpoint_info) if dp_process is None: diff --git a/FetchMigration/python/progress_metrics.py b/FetchMigration/python/progress_metrics.py index 4c271caf9..f135b4b08 100644 --- a/FetchMigration/python/progress_metrics.py +++ b/FetchMigration/python/progress_metrics.py @@ -15,50 +15,54 @@ class ProgressMetrics: _REC_IN_FLIGHT_KEY = "records_in_flight" _NO_PART_KEY = "no_partitions" - target_doc_count: int - idle_threshold: int - current_values_map: dict[str, Optional[int]] - prev_values_map: dict[str, Optional[int]] - counter_map: dict[str, int] + # Private member variables + __target_doc_count: int + __idle_threshold: int + __current_values_map: dict[str, Optional[int]] + __prev_values_map: dict[str, Optional[int]] + __counter_map: dict[str, int] def __init__(self, doc_count, idle_threshold): - self.target_doc_count = doc_count - self.idle_threshold = idle_threshold - self.current_values_map = dict() - self.prev_values_map = dict() - self.counter_map = dict() + self.__target_doc_count = doc_count + self.__idle_threshold = idle_threshold + self.__current_values_map = dict() + self.__prev_values_map = dict() + self.__counter_map = dict() + + def get_target_doc_count(self) -> int: + return self.__target_doc_count def __reset_counter(self, key: str): - if key in self.counter_map: - del self.counter_map[key] + if key in self.__counter_map: + del self.__counter_map[key] def __increment_counter(self, key: str): - val = self.counter_map.get(key, 0) - self.counter_map[key] = val + 1 + val = self.__counter_map.get(key, 0) + self.__counter_map[key] = val + 1 def __get_idle_value_key_name(self, key: str) -> str: return self.__IDLE_VALUE_PREFIX + key def __get_idle_value_count(self, key: str) -> Optional[int]: idle_value_key = self.__get_idle_value_key_name(key) - return self.counter_map.get(idle_value_key) + return self.__counter_map.get(idle_value_key) def __record_value(self, key: str, val: Optional[int]): - if key in self.current_values_map: + if key in self.__current_values_map: # Move current value to previous - self.prev_values_map[key] = self.current_values_map[key] + self.__prev_values_map[key] = self.__current_values_map[key] # Store new value - self.current_values_map[key] = val + self.__current_values_map[key] = val # Track idle value metrics idle_value_key = self.__get_idle_value_key_name(key) - if self.prev_values_map[key] == val: + if self.__prev_values_map[key] == val: self.__increment_counter(idle_value_key) else: self.__reset_counter(idle_value_key) - self.current_values_map[key] = val + self.__current_values_map[key] = val def __get_current_value(self, key: str) -> Optional[int]: - return self.current_values_map.get(key) + return self.__current_values_map.get(key) def reset_metric_api_failure(self): self.__reset_counter(self._METRIC_API_FAIL_KEY) @@ -89,20 +93,20 @@ def get_doc_completion_percentage(self) -> int: success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) if success_doc_count is None: success_doc_count = 0 - return math.floor((success_doc_count * 100) / self.target_doc_count) + return math.floor((success_doc_count * 100) / self.__target_doc_count) def all_docs_migrated(self) -> bool: # TODO Add a check for partitionsCompleted = indices success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) if success_doc_count is None: success_doc_count = 0 - return success_doc_count >= self.target_doc_count + return success_doc_count >= self.__target_doc_count def is_migration_complete_success(self) -> bool: is_idle_pipeline: bool = False rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY) no_partitions_count = self.__get_current_value(self._NO_PART_KEY) - prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0) + prev_no_partitions_count = self.__prev_values_map.get(self._NO_PART_KEY, 0) # Check for no records in flight if rec_in_flight is not None and rec_in_flight == 0: # No-partitions metrics should steadily tick up @@ -114,15 +118,15 @@ def is_migration_idle(self) -> bool: keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY] for key in keys_to_check: val = self.__get_idle_value_count(key) - if val is not None and val >= self.idle_threshold: + if val is not None and val >= self.__idle_threshold: logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " + - str(self.idle_threshold)) + str(self.__idle_threshold)) return True # End of loop return False def is_too_may_api_failures(self) -> bool: - return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold + return self.__counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.__idle_threshold def is_in_terminal_state(self) -> bool: return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures() @@ -135,4 +139,4 @@ def log_idle_pipeline_debug_metrics(self): # pragma no cover "Previous no-partition value: [{2}]" logging.debug(debug_msg_template.format(self.__get_current_value(self._REC_IN_FLIGHT_KEY), self.__get_current_value(self._NO_PART_KEY), - self.prev_values_map.get(self._NO_PART_KEY))) + self.__prev_values_map.get(self._NO_PART_KEY))) diff --git a/FetchMigration/python/tests/test_metadata_migration.py b/FetchMigration/python/tests/test_metadata_migration.py index 21af41d1e..b2d8e2894 100644 --- a/FetchMigration/python/tests/test_metadata_migration.py +++ b/FetchMigration/python/tests/test_metadata_migration.py @@ -88,26 +88,26 @@ def test_get_endpoint_info(self): # Simple base case test_config = create_plugin_config([host_input]) result = metadata_migration.get_endpoint_info(test_config) - self.assertEqual(expected_endpoint, result.url) - self.assertIsNone(result.auth) - self.assertTrue(result.verify_ssl) + self.assertEqual(expected_endpoint, result.get_url()) + self.assertIsNone(result.get_auth()) + self.assertTrue(result.is_verify_ssl()) # Invalid auth config test_config = create_plugin_config([host_input], test_user) result = metadata_migration.get_endpoint_info(test_config) - self.assertEqual(expected_endpoint, result.url) - self.assertIsNone(result.auth) + self.assertEqual(expected_endpoint, result.get_url()) + self.assertIsNone(result.get_auth()) # Valid auth config test_config = create_plugin_config([host_input], user=test_user, password=test_password) result = metadata_migration.get_endpoint_info(test_config) - self.assertEqual(expected_endpoint, result.url) - self.assertEqual(test_user, result.auth[0]) - self.assertEqual(test_password, result.auth[1]) + self.assertEqual(expected_endpoint, result.get_url()) + self.assertEqual(test_user, result.get_auth()[0]) + self.assertEqual(test_password, result.get_auth()[1]) # Array of hosts uses the first entry test_config = create_plugin_config([host_input, "other_host"], test_user, test_password) result = metadata_migration.get_endpoint_info(test_config) - self.assertEqual(expected_endpoint, result.url) - self.assertEqual(test_user, result.auth[0]) - self.assertEqual(test_password, result.auth[1]) + self.assertEqual(expected_endpoint, result.get_url()) + self.assertEqual(test_user, result.get_auth()[0]) + self.assertEqual(test_password, result.get_auth()[1]) def test_get_index_differences_empty(self): # Base case should return an empty list