From 6af8de7f3ab8405b0421daf1f8cfa2a9e487f793 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 Oct 2023 15:26:07 -0700 Subject: [PATCH] Incorporate PR feedback Signed-off-by: Kartik Ganesh --- FetchMigration/python/fetch_orchestrator.py | 4 +- FetchMigration/python/metadata_migration.py | 5 +- FetchMigration/python/migration_monitor.py | 70 +++++++++---------- .../python/migration_monitor_counters.py | 10 --- .../python/tests/test_migration_monitor.py | 9 +-- 5 files changed, 40 insertions(+), 58 deletions(-) delete mode 100644 FetchMigration/python/migration_monitor_counters.py diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index af4062dd6..22b7d516d 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -66,4 +66,6 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in sys.exit(0) else: logging.error("Process exited with non-zero return code: " + str(return_code)) - sys.exit(1) + if return_code is None: + return_code = 1 + sys.exit(return_code) diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index bd31f21f2..60775b821 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -1,10 +1,10 @@ import argparse -import yaml from typing import Optional +import yaml + import index_operations import utils - # Constants from endpoint_info import EndpointInfo from metadata_migration_params import MetadataMigrationParams @@ -174,7 +174,6 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult: diff = get_index_differences(source_indices, target_indices) # The first element in the tuple is the set of indices to create indices_to_create = diff[0] - result = MetadataMigrationResult() if indices_to_create: result.created_indices = indices_to_create result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info) diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index 54a8e0d0c..ffe345f11 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -1,21 +1,22 @@ import argparse import logging +import math import subprocess import time from subprocess import Popen from typing import Optional, List -import math import requests from prometheus_client import Metric from prometheus_client.parser import text_string_to_metric_families from endpoint_info import EndpointInfo -from migration_monitor_counters import MigrationMonitorCounters from migration_monitor_params import MigrationMonitorParams -__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus" -__SHUTDOWN_ENDPOINT = "/shutdown" +# Path to the Data Prepper Prometheus metrics API endpoint +# Used to monitor the progress of the migration +__METRICS_API_PATH = "/metrics/prometheus" +__SHUTDOWN_API_PATH = "/shutdown" __DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess" __RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight" __NO_PARTITIONS_METRIC = "_noPartitionsAcquired" @@ -31,20 +32,16 @@ def shutdown_process(proc: Popen) -> Optional[int]: if proc.returncode is None: # Failed to terminate, send SIGKILL proc.kill() - try: - proc.wait(timeout=60) - except subprocess.TimeoutExpired: - pass return proc.returncode def shutdown_pipeline(endpoint: EndpointInfo): - shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT + shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]: - metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT + metrics_endpoint = endpoint.url + __METRICS_API_PATH try: response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl) response.raise_for_status() @@ -85,65 +82,61 @@ def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_par return False -def check_and_log_progress(endpoint, internal_metrics, target_doc_count) -> tuple[bool, MigrationMonitorCounters]: +def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \ + tuple[bool, int]: terminal: bool = False # If the API call fails, the response is empty - metrics = fetch_prometheus_metrics(endpoint) + metrics = fetch_prometheus_metrics(endpoint_info) if metrics is not None: - # Reset API failure counter - internal_metrics.seq_metric_api_fail = 0 success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) if success_docs is not None: # pragma no cover - # Reset value failure counter - internal_metrics.seq_metric_value_fail = 0 completion_percentage: int = math.floor((success_docs * 100) / target_doc_count) progress_message: str = "Completed " + str(success_docs) + \ " docs ( " + str(completion_percentage) + "% )" logging.info(progress_message) else: - internal_metrics.seq_metric_value_fail += 1 logging.warning("Could not fetch progress stats from Data Prepper response, " + "will retry on next polling cycle...") - terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, - internal_metrics.prev_no_partitions_count, target_doc_count) + terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count, + target_doc_count) if not terminal: # Save no_partitions_count - internal_metrics.prev_no_partitions_count = no_partitions_count + prev_no_partitions_count = no_partitions_count else: - internal_metrics.seq_metric_api_fail += 1 logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...") # TODO - Handle idle non-terminal pipeline - return terminal, internal_metrics + return terminal, prev_no_partitions_count def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]: - endpoint = EndpointInfo(args.data_prepper_endpoint) + endpoint_info = EndpointInfo(args.data_prepper_endpoint) target_doc_count: int = args.target_count - # Counters to track idle pipeline and failures - internal_metrics = MigrationMonitorCounters() - # Track whether the migration has reached a terminal state - terminal = False + # Counter to track the no_partition_count metric + no_partition_count: int = 0 + is_migration_complete = False logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) # Sets returncode. A value of None means the subprocess has not yet terminated dp_process.poll() - while dp_process.returncode is None and not terminal: + while dp_process.returncode is None and not is_migration_complete: try: dp_process.wait(timeout=poll_interval_seconds) except subprocess.TimeoutExpired: pass if dp_process.returncode is None: - terminal, internal_metrics = check_and_log_progress(endpoint, internal_metrics, target_doc_count) + is_migration_complete, no_partition_count = check_and_log_progress( + endpoint_info, target_doc_count, no_partition_count) # Loop terminated - if not terminal: + if not is_migration_complete: logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode)) # TODO - Implement rollback + logging.error("Please delete any partially migrated indices before retrying the migration.") return dp_process.returncode else: # Shut down Data Prepper pipeline via API logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") - shutdown_pipeline(endpoint) + shutdown_pipeline(endpoint_info) if dp_process.returncode is None: # Workaround for https://github.com/opensearch-project/data-prepper/issues/3141 return shutdown_process(dp_process) @@ -152,18 +145,19 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None: - endpoint = EndpointInfo(args.data_prepper_endpoint) + endpoint_info = EndpointInfo(args.data_prepper_endpoint) target_doc_count: int = args.target_count - # Counters to track idle pipeline and failures - internal_metrics = MigrationMonitorCounters() - terminal = False + # Counter to track the no_partition_count metric + no_partition_count: int = 0 + is_migration_complete = False logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) - while not terminal: + while not is_migration_complete: time.sleep(poll_interval_seconds) - terminal, internal_metrics = check_and_log_progress(endpoint, internal_metrics, target_doc_count) + is_migration_complete, no_partition_count = check_and_log_progress( + endpoint_info, target_doc_count, no_partition_count) # Loop terminated, shut down the Data Prepper pipeline logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") - shutdown_pipeline(endpoint) + shutdown_pipeline(endpoint_info) if __name__ == '__main__': # pragma no cover diff --git a/FetchMigration/python/migration_monitor_counters.py b/FetchMigration/python/migration_monitor_counters.py deleted file mode 100644 index 2f5a2a215..000000000 --- a/FetchMigration/python/migration_monitor_counters.py +++ /dev/null @@ -1,10 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class MigrationMonitorCounters: - prev_no_partitions_count: int = 0 - prev_success_docs: int = -1 - idle_success_doc_count: int = 0 - seq_metric_api_fail: int = 0 - seq_metric_value_fail: int = 0 diff --git a/FetchMigration/python/tests/test_migration_monitor.py b/FetchMigration/python/tests/test_migration_monitor.py index 1c773d447..c6a45f886 100644 --- a/FetchMigration/python/tests/test_migration_monitor.py +++ b/FetchMigration/python/tests/test_migration_monitor.py @@ -9,11 +9,9 @@ import migration_monitor from endpoint_info import EndpointInfo - -# Constants -from migration_monitor_counters import MigrationMonitorCounters from migration_monitor_params import MigrationMonitorParams +# Constants TEST_ENDPOINT = "test" TEST_AUTH = ("user", "pass") TEST_FLAG = False @@ -168,8 +166,7 @@ def test_process_shutdown_invocation(self, mock_check: MagicMock, mock_shut_dp: mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - dummy = MigrationMonitorCounters() - mock_check.side_effect = [(False, dummy), (True, dummy)] + mock_check.side_effect = [(False, 1), (True, 2)] mock_subprocess = MagicMock() # set subprocess returncode to None to simulate a zombie Data Prepper process mock_subprocess.returncode = None @@ -195,7 +192,7 @@ def test_shutdown_process_terminate_fail(self): proc.wait.side_effect = [subprocess.TimeoutExpired("test", 1), None] result = migration_monitor.shutdown_process(proc) proc.terminate.assert_called_once() - self.assertEqual(2, proc.wait.call_count) + proc.wait.assert_called_once() proc.kill.assert_called_once() self.assertIsNone(result)