Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Incorporate PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Oct 31, 2023
1 parent 347bc5a commit 6af8de7
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 58 deletions.
4 changes: 3 additions & 1 deletion FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,6 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in
sys.exit(0)
else:
logging.error("Process exited with non-zero return code: " + str(return_code))
sys.exit(1)
if return_code is None:
return_code = 1
sys.exit(return_code)
5 changes: 2 additions & 3 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import yaml
from typing import Optional

import yaml

import index_operations
import utils

# Constants
from endpoint_info import EndpointInfo
from metadata_migration_params import MetadataMigrationParams
Expand Down Expand Up @@ -174,7 +174,6 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
result = MetadataMigrationResult()
if indices_to_create:
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
Expand Down
70 changes: 32 additions & 38 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
from typing import Optional, List
import math

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo
from migration_monitor_counters import MigrationMonitorCounters
from migration_monitor_params import MigrationMonitorParams

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"
Expand All @@ -31,20 +32,16 @@ def shutdown_process(proc: Popen) -> Optional[int]:
if proc.returncode is None:
# Failed to terminate, send SIGKILL
proc.kill()
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
pass
return proc.returncode


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
metrics_endpoint = endpoint.url + __METRICS_API_PATH
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
Expand Down Expand Up @@ -85,65 +82,61 @@ def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_par
return False


def check_and_log_progress(endpoint, internal_metrics, target_doc_count) -> tuple[bool, MigrationMonitorCounters]:
def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
# Reset API failure counter
internal_metrics.seq_metric_api_fail = 0
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
# Reset value failure counter
internal_metrics.seq_metric_value_fail = 0
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
internal_metrics.seq_metric_value_fail += 1
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
internal_metrics.prev_no_partitions_count, target_doc_count)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
internal_metrics.prev_no_partitions_count = no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
internal_metrics.seq_metric_api_fail += 1
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, internal_metrics
return terminal, prev_no_partitions_count


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
endpoint = EndpointInfo(args.data_prepper_endpoint)
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counters to track idle pipeline and failures
internal_metrics = MigrationMonitorCounters()
# Track whether the migration has reached a terminal state
terminal = False
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not terminal:
while dp_process.returncode is None and not is_migration_complete:
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
terminal, internal_metrics = check_and_log_progress(endpoint, internal_metrics, target_doc_count)
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated
if not terminal:
if not is_migration_complete:
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint)
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
Expand All @@ -152,18 +145,19 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint = EndpointInfo(args.data_prepper_endpoint)
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counters to track idle pipeline and failures
internal_metrics = MigrationMonitorCounters()
terminal = False
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not terminal:
while not is_migration_complete:
time.sleep(poll_interval_seconds)
terminal, internal_metrics = check_and_log_progress(endpoint, internal_metrics, target_doc_count)
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint)
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
Expand Down
10 changes: 0 additions & 10 deletions FetchMigration/python/migration_monitor_counters.py

This file was deleted.

9 changes: 3 additions & 6 deletions FetchMigration/python/tests/test_migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@

import migration_monitor
from endpoint_info import EndpointInfo

# Constants
from migration_monitor_counters import MigrationMonitorCounters
from migration_monitor_params import MigrationMonitorParams

# Constants
TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
Expand Down Expand Up @@ -168,8 +166,7 @@ def test_process_shutdown_invocation(self, mock_check: MagicMock, mock_shut_dp:
mock_shut_proc: MagicMock):
# The param values don't matter since we've mocked the check method
test_input = MigrationMonitorParams(1, "test")
dummy = MigrationMonitorCounters()
mock_check.side_effect = [(False, dummy), (True, dummy)]
mock_check.side_effect = [(False, 1), (True, 2)]
mock_subprocess = MagicMock()
# set subprocess returncode to None to simulate a zombie Data Prepper process
mock_subprocess.returncode = None
Expand All @@ -195,7 +192,7 @@ def test_shutdown_process_terminate_fail(self):
proc.wait.side_effect = [subprocess.TimeoutExpired("test", 1), None]
result = migration_monitor.shutdown_process(proc)
proc.terminate.assert_called_once()
self.assertEqual(2, proc.wait.call_count)
proc.wait.assert_called_once()
proc.kill.assert_called_once()
self.assertIsNone(result)

Expand Down

0 comments on commit 6af8de7

Please sign in to comment.