Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Fetch Migration] Improvements to subprocess handling (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#372)

The migration_monitor module now includes a separate method to monitor a Data Prepper subprocess. In addition to interacting/monitoring via Data Prepper API calls (which is the existing implementation) the new, "local" monitoring method adds logic to poll the running subprocess and its return code. Unit tests for these use-cases have been added/updated.

Additionally, fetch_orchestrator.py __main__ now exits via a call to sys.exit() - this enables quitting the Fetch Migration workflow with a return_code bubbled up from migration_monitor.py, which can help indicate a successful (return code zero) or non-successful execution.

Finally, this commit also includes a minor optimization to short-circuit logic when there are no source indices found, along with a unit test for this.

---------

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Oct 31, 2023
1 parent 786fa8f commit 135fedf
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 80 deletions.
26 changes: 15 additions & 11 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import logging
import os
import subprocess
import sys
from typing import Optional

import migration_monitor
import metadata_migration
from migration_monitor_params import MigrationMonitorParams
import migration_monitor
from metadata_migration_params import MetadataMigrationParams

from migration_monitor_params import MigrationMonitorParams

__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper"
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml"


def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[int]:
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True)
Expand All @@ -24,14 +25,10 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
# Data Prepper started successfully, run the migration monitor
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
migration_monitor.run(migration_monitor_params)
# Migration ended, the following is a workaround for
# https://github.com/opensearch-project/data-prepper/issues/3141
if proc.returncode is None:
proc.terminate()
return migration_monitor.monitor_local(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down Expand Up @@ -64,4 +61,11 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
decoded_bytes = base64.b64decode(inline_pipeline)
with open(cli_args.config_file_path, 'wb') as config_file:
config_file.write(decoded_bytes)
run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
return_code = run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
if return_code == 0:
sys.exit(0)
else:
logging.error("Process exited with non-zero return code: " + str(return_code))
if return_code is None:
return_code = 1
sys.exit(return_code)
16 changes: 10 additions & 6 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
def fetch_all_indices(endpoint: EndpointInfo) -> dict:
actual_endpoint = endpoint.url + __ALL_INDICES_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
# Remove internal settings
result = dict(resp.json())
for index in result:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
for index in list(result.keys()):
# Remove system indices
if index.startswith("."):
del result[index]
# Remove internal settings
else:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result


Expand Down
9 changes: 6 additions & 3 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import yaml
from typing import Optional

import yaml

import index_operations
import utils

# Constants
from endpoint_info import EndpointInfo
from metadata_migration_params import MetadataMigrationParams
Expand Down Expand Up @@ -163,14 +163,17 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
validate_pipeline_config(pipeline_config)
result = MetadataMigrationResult()
# Fetch EndpointInfo and indices
source_endpoint_info, source_indices = compute_endpoint_and_fetch_indices(pipeline_config, SOURCE_KEY)
# If source indices is empty, return immediately
if len(source_indices.keys()) == 0:
return result
target_endpoint_info, target_indices = compute_endpoint_and_fetch_indices(pipeline_config, SINK_KEY)
# Compute index differences and print report
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
result = MetadataMigrationResult()
if indices_to_create:
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
Expand Down
120 changes: 92 additions & 28 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
from typing import Optional, List
import math

import requests
from prometheus_client import Metric
Expand All @@ -11,20 +13,35 @@
from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


# Gracefully shutdown a subprocess
def shutdown_process(proc: Popen) -> Optional[int]:
# Process is still running, send SIGTERM
proc.terminate()
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
if proc.returncode is None:
# Failed to terminate, send SIGKILL
proc.kill()
return proc.returncode


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
metrics_endpoint = endpoint.url + __METRICS_API_PATH
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
Expand Down Expand Up @@ -65,35 +82,82 @@ def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_par
return False


def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, prev_no_partitions_count


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not is_migration_complete:
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated
if not is_migration_complete:
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint = EndpointInfo(args.data_prepper_endpoint)
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
prev_no_partitions_count = 0
terminal = False
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not terminal:
while not is_migration_complete:
time.sleep(poll_interval_seconds)
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
logging.info("Could not fetch metrics from Data Prepper, will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint)
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
Expand Down
32 changes: 3 additions & 29 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.run')
@patch('migration_monitor.monitor_local')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand All @@ -31,35 +31,9 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_not_called()
mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
def test_orchestrator_shutdown_workaround(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
test_path = "test_path"
test_file = "test_file"
test_host = "test_host"
# Setup mock pre-migration
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml",
report=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host)
mock_metadata_migration.return_value = test_result
# set subprocess return value to None to simulate a zombie Data Prepper process
mock_subprocess.return_value.returncode = None
# Run test
orchestrator.run(test_path, test_file, test_host)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_called_once()

@patch('migration_monitor.run')
@patch('migration_monitor.monitor_local')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand Down
12 changes: 11 additions & 1 deletion FetchMigration/python/tests/test_index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@ class TestSearchEndpoint(unittest.TestCase):
@responses.activate
def test_fetch_all_indices(self):
# Set up GET response
responses.get(test_constants.SOURCE_ENDPOINT + "*", json=test_constants.BASE_INDICES_DATA)
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
# Add system index
test_data[".system-index"] = {
test_constants.SETTINGS_KEY: {
test_constants.INDEX_KEY: {
test_constants.NUM_SHARDS_SETTING: 1,
test_constants.NUM_REPLICAS_SETTING: 1
}
}
}
responses.get(test_constants.SOURCE_ENDPOINT + "*", json=test_data)
# Now send request
index_data = index_operations.fetch_all_indices(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(3, len(index_data.keys()))
Expand Down
10 changes: 10 additions & 0 deletions FetchMigration/python/tests/test_metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ def test_missing_output_file_non_report(self):
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH)
self.assertRaises(ValueError, metadata_migration.run, test_input)

@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_no_indices_in_source(self, mock_fetch_indices: MagicMock):
mock_fetch_indices.return_value = {}
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy")
test_result = metadata_migration.run(test_input)
mock_fetch_indices.assert_called_once()
self.assertEqual(0, test_result.target_doc_count)
self.assertEqual(0, len(test_result.created_indices))


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 135fedf

Please sign in to comment.