Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Fetch Migration] Refactoring migration_monitor.py functions
Browse files Browse the repository at this point in the history
Run and monitor_local have been merged into a single function since most of their code/logic is identical. Unit tests have been updated for improved coverage.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Oct 31, 2023
1 parent e87e33c commit a8bf44c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 42 deletions.
2 changes: 1 addition & 1 deletion FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
return migration_monitor.monitor_local(migration_monitor_params, proc)
return migration_monitor.run(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down
54 changes: 23 additions & 31 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,30 @@ def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetric
return progress


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
def __should_continue_monitoring(metrics: ProgressMetrics, proc: Optional[Popen] = None) -> bool:
return not metrics.is_in_terminal_state() and (proc is None or is_process_alive(proc))


# Last parameter is optional, and signifies a local Data Prepper process
def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while is_process_alive(dp_process) and not progress_metrics.is_in_terminal_state():
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if is_process_alive(dp_process):
while __should_continue_monitoring(progress_metrics, dp_process):
if dp_process is not None:
# Wait on local process
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
else:
# Thread sleep
time.sleep(poll_interval_seconds)
if dp_process is None or is_process_alive(dp_process):
progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Log debug metrics
progress_metrics.log_idle_pipeline_debug_metrics()
# Loop terminated, shut down the Data Prepper pipeline
# Loop terminated
if not progress_metrics.is_in_terminal_state():
# This will only happen for a local Data Prepper process
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
Expand All @@ -121,31 +128,16 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
shutdown_pipeline(endpoint_info)
if is_process_alive(dp_process):
if dp_process is None:
# No local process
return 0
elif is_process_alive(dp_process):
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
while not progress_metrics.is_in_terminal_state():
time.sleep(poll_interval_seconds)
progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Loop terminated, shut down the Data Prepper pipeline
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
Expand Down
1 change: 1 addition & 0 deletions FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
class MigrationMonitorParams:
target_count: int
data_prepper_endpoint: str
is_local_process: bool = False
4 changes: 2 additions & 2 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand All @@ -33,7 +33,7 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value)

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand Down
19 changes: 11 additions & 8 deletions FetchMigration/python/tests/test_migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,26 @@ def test_check_and_log_progress(self, mock_fetch: MagicMock, mock_get_metric: Ma
# All-docs-migrated check is invoked
mock_progress.all_docs_migrated.assert_called_once()

@patch('migration_monitor.shutdown_process')
@patch('migration_monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('migration_monitor.check_and_log_progress')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut: MagicMock):
def test_monitor_non_local(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut_dp: MagicMock,
mock_shut_proc: MagicMock):
# The param values don't matter since we've mocked the check method
test_input = MigrationMonitorParams(1, "test")
mock_progress = MagicMock()
mock_progress.is_in_terminal_state.return_value = True
mock_check.return_value = mock_progress
# Run test method
wait_time = 3
migration_monitor.run(test_input, wait_time)
migration_monitor.run(test_input, None, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint)
mock_sleep.assert_called_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)
mock_shut_dp.assert_called_once_with(expected_endpoint_info)
mock_shut_proc.assert_not_called()

@patch('migration_monitor.shutdown_process')
@patch('migration_monitor.shutdown_pipeline')
Expand All @@ -162,7 +165,7 @@ def test_monitor_local_process_exit(self, mock_shut_dp: MagicMock, mock_shut_pro
expected_return_code: int = 1
mock_subprocess.returncode = expected_return_code
# Run test method
return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0)
return_code = migration_monitor.run(test_input, mock_subprocess)
self.assertEqual(expected_return_code, return_code)
mock_shut_dp.assert_not_called()
mock_shut_proc.assert_not_called()
Expand All @@ -178,18 +181,18 @@ def test_monitor_local_migration_complete(self, mock_check: MagicMock, mock_is_a
test_input = MigrationMonitorParams(1, "test")
# Simulate a successful migration
mock_progress = MagicMock()
mock_progress.is_in_terminal_state.return_value = True
mock_progress.is_in_terminal_state.side_effect = [False, True]
mock_progress.is_migration_complete_success.return_value = True
mock_check.return_value = mock_progress
# Sequence of expected return values for a process that terminates successfully
mock_is_alive.side_effect = [True, True, True, False]
mock_is_alive.side_effect = [True, True, False, False]
mock_subprocess = MagicMock()
expected_return_code: int = 0
mock_subprocess.returncode = expected_return_code
# Simulate timeout on wait
mock_subprocess.wait.side_effect = [subprocess.TimeoutExpired("test", 1)]
# Run test method
actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0)
actual_return_code = migration_monitor.run(test_input, mock_subprocess)
self.assertEqual(expected_return_code, actual_return_code)
expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint)
mock_check.assert_called_once_with(expected_endpoint_info, ANY)
Expand All @@ -215,7 +218,7 @@ def test_monitor_local_shutdown_process(self, mock_check: MagicMock, mock_shut_d
expected_return_code: int = 137
mock_shut_proc.return_value = 137
# Run test method
actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0)
actual_return_code = migration_monitor.run(test_input, mock_subprocess)
self.assertEqual(expected_return_code, actual_return_code)
expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint)
mock_shut_dp.assert_called_once_with(expected_endpoint_info)
Expand Down

0 comments on commit a8bf44c

Please sign in to comment.