From 190954e401fa9715ba97563187dda6681c6f3924 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 20 Dec 2023 15:36:46 -0800 Subject: [PATCH] [Fetch Migration] Suppress exit code for graceful termination (SIGTERM) The orchestrator now suppresses exit code 143 if it is returned by the migration monitor, since this is a result of a workaround for https://github.com/opensearch-project/data-prepper/issues/3141 . Unit tests have been added to cover this scenario. This change also ensures that a return_code is always returned by the function, and updates some stale comments. Signed-off-by: Kartik Ganesh --- FetchMigration/python/fetch_orchestrator.py | 9 +++- .../python/tests/test_fetch_orchestrator.py | 52 ++++++++++++++++--- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index ef165e4f0..4b8ea476b 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -75,7 +75,7 @@ def write_inline_target_host(pipeline_file_path: str, inline_target_host: str): yaml.safe_dump(pipeline_yaml, pipeline_file) -def run(params: FetchOrchestratorParams) -> Optional[int]: +def run(params: FetchOrchestratorParams) -> int: # This is expected to be a base64 encoded string inline_pipeline = __get_env_string("INLINE_PIPELINE") inline_target_host = __get_env_string("INLINE_TARGET_HOST") @@ -93,6 +93,7 @@ def run(params: FetchOrchestratorParams) -> Optional[int]: report=True, dryrun=params.is_dry_run) logging.info("Running metadata migration...\n") metadata_migration_result = metadata_migration.run(metadata_migration_params) + return_code: int = 0 if metadata_migration_result.target_doc_count == 0: logging.warning("Target document count is zero, skipping data migration...") elif len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration(): @@ -103,8 +104,12 @@ def run(params: FetchOrchestratorParams) -> Optional[int]: migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, params.get_local_endpoint()) logging.info("Starting migration monitor...\n") - return migration_monitor.run(migration_monitor_params, proc) + return_code = migration_monitor.run(migration_monitor_params, proc) + # Suppress non-zero return code for graceful termination (SIGTERM) + if return_code == 143: + return_code = 0 logging.info("Fetch Migration workflow concluded\n") + return return_code if __name__ == '__main__': # pragma no cover diff --git a/FetchMigration/python/tests/test_fetch_orchestrator.py b/FetchMigration/python/tests/test_fetch_orchestrator.py index 84ef7517e..df53411f6 100644 --- a/FetchMigration/python/tests/test_fetch_orchestrator.py +++ b/FetchMigration/python/tests/test_fetch_orchestrator.py @@ -39,7 +39,7 @@ def tearDown(self) -> None: def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, mock_monitor: MagicMock): fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file") - # Setup mock pre-migration + # Setup mock metadata migration expected_metadata_migration_input = \ MetadataMigrationParams(fetch_params.pipeline_file_path, fetch_params.data_prepper_path + "/pipelines/pipeline.yaml", @@ -63,13 +63,15 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc @mock.patch.dict(os.environ, {}, clear=True) def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, mock_monitor: MagicMock): - # Setup empty result from pre-migration + # Setup empty result from metadata migration mock_metadata_migration.return_value = MetadataMigrationResult() - orchestrator.run(FetchOrchestratorParams("test", "test")) + result = orchestrator.run(FetchOrchestratorParams("test", "test")) mock_metadata_migration.assert_called_once_with(ANY) # Subsequent steps should not be called mock_subprocess.assert_not_called() mock_monitor.assert_not_called() + # Expect successful exit code + self.assertEqual(0, result) @patch('migration_monitor.run') @patch('subprocess.Popen') @@ -79,7 +81,7 @@ def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, moc def test_orchestrator_run_create_only(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, mock_monitor: MagicMock): fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file", create_only=True) - # Setup mock pre-migration + # Setup mock metadata migration expected_metadata_migration_input = \ MetadataMigrationParams(fetch_params.pipeline_file_path, fetch_params.data_prepper_path + "/pipelines/pipeline.yaml", @@ -87,10 +89,12 @@ def test_orchestrator_run_create_only(self, mock_metadata_migration: MagicMock, test_result = MetadataMigrationResult(10, {"index1", "index2"}) mock_metadata_migration.return_value = test_result # Run test - orchestrator.run(fetch_params) + result = orchestrator.run(fetch_params) mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input) mock_subprocess.assert_not_called() mock_monitor.assert_not_called() + # Expect successful exit code + self.assertEqual(0, result) @patch('migration_monitor.run') @patch('subprocess.Popen') @@ -100,7 +104,7 @@ def test_orchestrator_run_create_only(self, mock_metadata_migration: MagicMock, def test_orchestrator_dryrun(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, mock_monitor: MagicMock): fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file", dryrun=True) - # Setup mock pre-migration + # Setup mock metadata migration expected_metadata_migration_input = \ MetadataMigrationParams(fetch_params.pipeline_file_path, fetch_params.data_prepper_path + "/pipelines/pipeline.yaml", @@ -108,10 +112,44 @@ def test_orchestrator_dryrun(self, mock_metadata_migration: MagicMock, mock_subp test_result = MetadataMigrationResult(10, {"index1", "index2"}) mock_metadata_migration.return_value = test_result # Run test - orchestrator.run(fetch_params) + result = orchestrator.run(fetch_params) mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input) mock_subprocess.assert_not_called() mock_monitor.assert_not_called() + # Expect successful exit code + self.assertEqual(0, result) + + @patch('migration_monitor.run') + @patch('subprocess.Popen') + @patch('metadata_migration.run') + # Note that mock objects are passed bottom-up from the patch order above + @mock.patch.dict(os.environ, {}, clear=True) + def test_orchestrator_suppressed_exit_code(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, + mock_monitor: MagicMock): + # Setup mock metadata migration + test_result = MetadataMigrationResult(10, {"index1", "index2"}) + mock_metadata_migration.return_value = test_result + # Set up graceful termination exit code + mock_monitor.return_value = 143 + result = orchestrator.run(FetchOrchestratorParams("test_dp_path", "test_pipeline_file")) + # Expect non-zero exit code to be suppressed + self.assertEqual(0, result) + + @patch('migration_monitor.run') + @patch('subprocess.Popen') + @patch('metadata_migration.run') + # Note that mock objects are passed bottom-up from the patch order above + @mock.patch.dict(os.environ, {}, clear=True) + def test_orchestrator_nonzero_exit_code(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, + mock_monitor: MagicMock): + # Setup mock metadata migration + test_result = MetadataMigrationResult(10, {"index1", "index2"}) + mock_metadata_migration.return_value = test_result + sigkill_exit_code: int = 137 + mock_monitor.return_value = sigkill_exit_code + result = orchestrator.run(FetchOrchestratorParams("test_dp_path", "test_pipeline_file")) + # Expect non-zero exit code to be returned + self.assertEqual(sigkill_exit_code, result) @patch('fetch_orchestrator.write_inline_target_host') @patch('metadata_migration.run')