From 0cf7eccbce80e51721125a135b3999be167ee7df Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Mon, 11 Dec 2023 10:51:52 -0800 Subject: [PATCH] [WIP 2] Enable Migration Console to run metadata migration logic directly TODO - Currently the python files have been duplicated from FetchMigration. Need to find a better way to lay this out. Also, Fetch Migration python code is not installed by the Dockerfile since it causes conflicts with the botocore library from Opensearch Benchmarks. This change adds a "runOSIHistoricalDataMigration" shell script to the Migration Console image and adds permissions for the Console to fetch the pipeline secret value from Secrets Manager Signed-off-by: Kartik Ganesh --- .../main/docker/migrationConsole/Dockerfile | 6 + .../migrationConsole/python/endpoint_info.py | 50 ++++++ .../migrationConsole/python/endpoint_utils.py | 164 ++++++++++++++++++ .../migrationConsole/python/index_diff.py | 44 +++++ .../python/index_doc_count.py | 17 ++ .../python/index_operations.py | 106 +++++++++++ .../python/metadata_migration.py | 140 +++++++++++++++ .../python/metadata_migration_params.py | 18 ++ .../python/metadata_migration_result.py | 17 ++ .../python/osi_data_migration.py | 60 +++++++ .../migrationConsole/python/requirements.txt | 8 + .../docker/migrationConsole/python/utils.py | 31 ++++ .../runOSIHistoricalDataMigration.sh | 58 +++++++ .../service-stacks/migration-console-stack.ts | 9 + 14 files changed, 728 insertions(+) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index 9518cb2a1..09fbfca79 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -13,12 +13,18 @@ COPY runTestBenchmarks.sh /root/ COPY humanReadableLogs.py /root/ COPY catIndices.sh /root/ COPY showFetchMigrationCommand.sh /root/ +COPY runOSIHistoricalDataMigration.sh /root/ COPY msk-iam-auth.properties /root/kafka-tools/aws COPY kafkaCmdRef.md /root/kafka-tools RUN chmod ug+x /root/runTestBenchmarks.sh RUN chmod ug+x /root/humanReadableLogs.py RUN chmod ug+x /root/catIndices.sh RUN chmod ug+x /root/showFetchMigrationCommand.sh +RUN chmod ug+x /root/runOSIHistoricalDataMigration.sh +COPY python/ /root/python/ +# TODO - Installing here messes with OSB's botocore +#WORKDIR /root/python +#RUN pip3 install --user -r requirements.txt WORKDIR /root/kafka-tools # Get kafka distribution and unpack to 'kafka' RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py new file mode 100644 index 000000000..4f06c7427 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py @@ -0,0 +1,50 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from typing import Union + +from requests_aws4auth import AWS4Auth + + +# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster +class EndpointInfo: + # Private member variables + __url: str + # "|" operator is only supported in 3.10+ + __auth: Union[AWS4Auth, tuple, None] + __verify_ssl: bool + + def __init__(self, url: str, auth: Union[AWS4Auth, tuple, None] = None, verify_ssl: bool = True): + self.__url = url + # Normalize url value to have trailing slash + if not url.endswith("/"): + self.__url += "/" + self.__auth = auth + self.__verify_ssl = verify_ssl + + def __eq__(self, obj): + return isinstance(obj, EndpointInfo) and \ + self.__url == obj.__url and \ + self.__auth == obj.__auth and \ + self.__verify_ssl == obj.__verify_ssl + + def add_path(self, path: str) -> str: + # Remove leading slash if present + if path.startswith("/"): + path = path[1:] + return self.__url + path + + def get_url(self) -> str: + return self.__url + + def get_auth(self) -> Union[AWS4Auth, tuple, None]: + return self.__auth + + def is_verify_ssl(self) -> bool: + return self.__verify_ssl diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py new file mode 100644 index 000000000..3b1b3a8ae --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py @@ -0,0 +1,164 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import re +from typing import Optional, Union + +from requests_aws4auth import AWS4Auth +from botocore.session import Session + +from endpoint_info import EndpointInfo + +# Constants +SOURCE_KEY = "source" +SINK_KEY = "sink" +SUPPORTED_PLUGINS = ["opensearch", "elasticsearch"] +HOSTS_KEY = "hosts" +INSECURE_KEY = "insecure" +CONNECTION_KEY = "connection" +DISABLE_AUTH_KEY = "disable_authentication" +USER_KEY = "username" +PWD_KEY = "password" +AWS_SIGV4_KEY = "aws_sigv4" +AWS_REGION_KEY = "aws_region" +AWS_CONFIG_KEY = "aws" +AWS_CONFIG_REGION_KEY = "region" +IS_SERVERLESS_KEY = "serverless" +ES_SERVICE_NAME = "es" +AOSS_SERVICE_NAME = "aoss" +URL_REGION_PATTERN = re.compile(r"([\w-]*)\.(es|aoss)\.amazonaws\.com") + + +def __get_url(plugin_config: dict) -> str: + # "hosts" can be a simple string, or an array of hosts for Logstash to hit. + # This tool needs one accessible host, so we pick the first entry in the latter case. + return plugin_config[HOSTS_KEY][0] if isinstance(plugin_config[HOSTS_KEY], list) else plugin_config[HOSTS_KEY] + + +# Helper function that attempts to extract the AWS region from a URL, +# assuming it is of the form *...amazonaws.com +def __derive_aws_region_from_url(url: str) -> Optional[str]: + match = URL_REGION_PATTERN.search(url) + if match: + # Index 0 returns the entire match, index 1 returns only the first group + return match.group(1) + return None + + +def get_aws_region(plugin_config: dict) -> str: + if plugin_config.get(AWS_SIGV4_KEY, False) and plugin_config.get(AWS_REGION_KEY, None) is not None: + return plugin_config[AWS_REGION_KEY] + elif plugin_config.get(AWS_CONFIG_KEY, None) is not None: + aws_config = plugin_config[AWS_CONFIG_KEY] + if not isinstance(aws_config, dict): + raise ValueError("Unexpected value for 'aws' configuration") + elif aws_config.get(AWS_CONFIG_REGION_KEY, None) is not None: + return aws_config[AWS_CONFIG_REGION_KEY] + # Region not explicitly defined, attempt to derive from URL + derived_region = __derive_aws_region_from_url(__get_url(plugin_config)) + if derived_region is None: + raise ValueError("No region configured for AWS SigV4 auth, or derivable from host URL") + return derived_region + + +def __check_supported_endpoint(section_config: dict) -> Optional[tuple]: + for supported_type in SUPPORTED_PLUGINS: + if supported_type in section_config: + return supported_type, section_config[supported_type] + + +# This config key may be either directly in the main dict (for sink) +# or inside a nested dict (for source). The default value is False. +def is_insecure(plugin_config: dict) -> bool: + if INSECURE_KEY in plugin_config: + return plugin_config[INSECURE_KEY] + elif CONNECTION_KEY in plugin_config and INSECURE_KEY in plugin_config[CONNECTION_KEY]: + return plugin_config[CONNECTION_KEY][INSECURE_KEY] + return False + + +def validate_pipeline(pipeline: dict): + if SOURCE_KEY not in pipeline: + raise ValueError("Missing source configuration in Data Prepper pipeline YAML") + if SINK_KEY not in pipeline: + raise ValueError("Missing sink configuration in Data Prepper pipeline YAML") + + +def validate_auth(plugin_name: str, plugin_config: dict): + # If auth is disabled, no further validation is required + if plugin_config.get(DISABLE_AUTH_KEY, False): + return + # If AWS SigV4 is configured, validate region + if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: + # Raises a ValueError if region cannot be derived + get_aws_region(plugin_config) + # Validate basic auth + elif USER_KEY not in plugin_config: + raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name) + elif PWD_KEY not in plugin_config: + raise ValueError("Invalid auth configuration (no password for username) for plugin: " + plugin_name) + + +def get_supported_endpoint_config(pipeline_config: dict, section_key: str) -> tuple: + # The value of each key may be a single plugin (as a dict) or a list of plugin configs + supported_tuple = tuple() + if isinstance(pipeline_config[section_key], dict): + supported_tuple = __check_supported_endpoint(pipeline_config[section_key]) + elif isinstance(pipeline_config[section_key], list): + for entry in pipeline_config[section_key]: + supported_tuple = __check_supported_endpoint(entry) + # Break out of the loop at the first supported type + if supported_tuple: + break + if not supported_tuple: + raise ValueError("Could not find any supported endpoints in section: " + section_key) + # First tuple value is the plugin name, second value is the plugin config dict + return supported_tuple[0], supported_tuple[1] + + +def get_aws_sigv4_auth(region: str, is_serverless: bool = False) -> AWS4Auth: + credentials = Session().get_credentials() + if not credentials: + raise ValueError("Unable to fetch AWS session credentials for SigV4 auth") + if is_serverless: + return AWS4Auth(region=region, service=AOSS_SERVICE_NAME, refreshable_credentials=credentials) + else: + return AWS4Auth(region=region, service=ES_SERVICE_NAME, refreshable_credentials=credentials) + + +def get_auth(plugin_config: dict) -> Union[AWS4Auth, tuple, None]: + # Basic auth + if USER_KEY in plugin_config and PWD_KEY in plugin_config: + return plugin_config[USER_KEY], plugin_config[PWD_KEY] + elif plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: + is_serverless = False + # OpenSearch Serverless uses a different service name + if AWS_CONFIG_KEY in plugin_config: + aws_config = plugin_config[AWS_CONFIG_KEY] + if isinstance(aws_config, dict) and aws_config.get(IS_SERVERLESS_KEY, False): + is_serverless = True + region = get_aws_region(plugin_config) + return get_aws_sigv4_auth(region, is_serverless) + return None + + +def get_endpoint_info_from_plugin_config(plugin_config: dict) -> EndpointInfo: + # verify boolean will be the inverse of the insecure SSL key, if present + should_verify = not is_insecure(plugin_config) + return EndpointInfo(__get_url(plugin_config), get_auth(plugin_config), should_verify) + + +def get_endpoint_info_from_pipeline_config(pipeline_config: dict, section_key: str) -> EndpointInfo: + # Raises a ValueError if no supported endpoints are found + plugin_name, plugin_config = get_supported_endpoint_config(pipeline_config, section_key) + if HOSTS_KEY not in plugin_config: + raise ValueError("No hosts defined for plugin: " + plugin_name) + # Raises a ValueError if there an error in the auth configuration + validate_auth(plugin_name, plugin_config) + return get_endpoint_info_from_plugin_config(plugin_config) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py new file mode 100644 index 000000000..5e980ba22 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py @@ -0,0 +1,44 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import utils +from index_operations import SETTINGS_KEY, MAPPINGS_KEY + + +# Computes and captures differences in indices between a "source" cluster +# and a "target" cluster. Indices that exist on the source cluster but not +# on the target cluster are considered "to-create". "Conflicting" indices +# are present on both source and target clusters, but differ in their index +# settings or mappings. +class IndexDiff: + indices_to_create: set + identical_indices: set + identical_empty_indices: set + conflicting_indices: set + + def __init__(self, source: dict, target: dict): + self.identical_empty_indices = set() + self.conflicting_indices = set() + # Compute index names that are present in both the source and target + indices_intersection = set(source.keys()) & set(target.keys()) + # Check if these "common" indices are identical or have metadata conflicts + for index in indices_intersection: + # Check settings + if utils.has_differences(SETTINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Check mappings + if utils.has_differences(MAPPINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Identical indices are the subset that do not have metadata conflicts + self.identical_indices = set(indices_intersection) - set(self.conflicting_indices) + # Indices that are not already on the target need to be created + self.indices_to_create = set(source.keys()) - set(indices_intersection) + + def set_identical_empty_indices(self, indices: set): + self.identical_empty_indices = indices diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py new file mode 100644 index 000000000..c4323966b --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py @@ -0,0 +1,17 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass + + +# Captures the doc_count for indices in a cluster, and also computes a total +@dataclass +class IndexDocCount: + total: int + index_doc_count_map: dict diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py new file mode 100644 index 000000000..9c3ed54ea --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py @@ -0,0 +1,106 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from typing import Optional + +import jsonpath_ng +import requests + +from endpoint_info import EndpointInfo +from index_doc_count import IndexDocCount + +# Constants +SETTINGS_KEY = "settings" +MAPPINGS_KEY = "mappings" +ALIASES_KEY = "aliases" +COUNT_KEY = "count" +__INDEX_KEY = "index" +__ALL_INDICES_ENDPOINT = "*" +__SEARCH_COUNT_PATH = "/_search?size=0" +__SEARCH_COUNT_PAYLOAD = {"aggs": {"count": {"terms": {"field": "_index"}}}} +__TOTAL_COUNT_JSONPATH = jsonpath_ng.parse("$.hits.total.value") +__INDEX_COUNT_JSONPATH = jsonpath_ng.parse("$.aggregations.count.buckets") +__BUCKET_INDEX_NAME_KEY = "key" +__BUCKET_DOC_COUNT_KEY = "doc_count" +__INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"] +__TIMEOUT_SECONDS = 10 + + +def __send_get_request(url: str, endpoint: EndpointInfo, payload: Optional[dict] = None) -> requests.Response: + try: + resp = requests.get(url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), json=payload, + timeout=__TIMEOUT_SECONDS) + resp.raise_for_status() + return resp + except requests.ConnectionError: + raise RuntimeError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}") + except requests.HTTPError as e: + raise RuntimeError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()} - {e!s}") + except requests.Timeout: + # TODO retry mechanism + raise RuntimeError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}") + except requests.exceptions.RequestException as e: + raise RuntimeError(f"GET request failure to cluster endpoint: {endpoint.get_url()} - {e!s}") + + +def fetch_all_indices(endpoint: EndpointInfo) -> dict: + all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT) + try: + # raises RuntimeError in case of any request errors + resp = __send_get_request(all_indices_url, endpoint) + result = dict(resp.json()) + for index in list(result.keys()): + # Remove system indices + if index.startswith("."): + del result[index] + # Remove internal settings + else: + for setting in __INTERNAL_SETTINGS_KEYS: + index_settings = result[index][SETTINGS_KEY] + if __INDEX_KEY in index_settings: + index_settings[__INDEX_KEY].pop(setting, None) + return result + except RuntimeError as e: + raise RuntimeError(f"Failed to fetch metadata from cluster endpoint: {e!s}") + + +def create_indices(indices_data: dict, endpoint: EndpointInfo) -> dict: + failed_indices = dict() + for index in indices_data: + index_endpoint = endpoint.add_path(index) + data_dict = dict() + data_dict[SETTINGS_KEY] = indices_data[index][SETTINGS_KEY] + data_dict[MAPPINGS_KEY] = indices_data[index][MAPPINGS_KEY] + if ALIASES_KEY in indices_data[index] and len(indices_data[index][ALIASES_KEY]) > 0: + data_dict[ALIASES_KEY] = indices_data[index][ALIASES_KEY] + try: + resp = requests.put(index_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), + json=data_dict, timeout=__TIMEOUT_SECONDS) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + failed_indices[index] = e + # Loop completed, return failures if any + return failed_indices + + +def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount: + count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH + doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix) + try: + # raises RuntimeError in case of any request errors + resp = __send_get_request(doc_count_endpoint, endpoint, __SEARCH_COUNT_PAYLOAD) + result = dict(resp.json()) + total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value + counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value + count_map = dict() + for entry in counts_list: + count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY] + return IndexDocCount(total, count_map) + except RuntimeError as e: + raise RuntimeError(f"Failed to fetch doc_count: {e!s}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py new file mode 100644 index 000000000..aaff51cc9 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py @@ -0,0 +1,140 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import argparse +import logging + +import yaml + +import endpoint_utils +import index_operations +import utils +from index_diff import IndexDiff +from metadata_migration_params import MetadataMigrationParams +from metadata_migration_result import MetadataMigrationResult + +# Constants +INDICES_KEY = "indices" +INCLUDE_KEY = "include" +INDEX_NAME_KEY = "index_name_regex" + + +def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str): + pipeline_config = next(iter(yaml_data.values())) + # Result is a tuple of (type, config) + source_config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1] + source_indices = source_config.get(INDICES_KEY, dict()) + included_indices = source_indices.get(INCLUDE_KEY, list()) + for index in indices_to_migrate: + included_indices.append({INDEX_NAME_KEY: index}) + source_indices[INCLUDE_KEY] = included_indices + source_config[INDICES_KEY] = source_indices + with open(output_path, 'w') as out_file: + yaml.dump(yaml_data, out_file) + + +def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover + logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices)) + logging.info("Identical empty indices in the target cluster (will be migrated): " + + utils.string_from_set(diff.identical_empty_indices)) + logging.info("Indices present in both clusters with conflicting settings/mappings (will NOT be migrated): " + + utils.string_from_set(diff.conflicting_indices)) + logging.info("Indices to be created in the target cluster (will be migrated): " + + utils.string_from_set(diff.indices_to_create)) + logging.info("Target document count: " + str(total_doc_count)) + + +def run(args: MetadataMigrationParams) -> MetadataMigrationResult: + # Sanity check + if not args.report and len(args.output_file) == 0: + raise ValueError("No output file specified") + # Parse and validate pipelines YAML file + with open(args.config_file_path, 'r') as pipeline_file: + dp_config = yaml.safe_load(pipeline_file) + # We expect the Data Prepper pipeline to only have a single top-level value + pipeline_config = next(iter(dp_config.values())) + # Raises a ValueError if source or sink definitions are missing + endpoint_utils.validate_pipeline(pipeline_config) + source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config, + endpoint_utils.SOURCE_KEY) + target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config, + endpoint_utils.SINK_KEY) + result = MetadataMigrationResult() + # Fetch indices + source_indices = index_operations.fetch_all_indices(source_endpoint_info) + # If source indices is empty, return immediately + if len(source_indices.keys()) == 0: + return result + target_indices = index_operations.fetch_all_indices(target_endpoint_info) + # Compute index differences and print report + diff = IndexDiff(source_indices, target_indices) + if diff.identical_indices: + # Identical indices with zero documents on the target are eligible for migration + target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info) + # doc_count only returns indices that have non-zero counts, so the difference in responses + # gives us the set of identical, empty indices + result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys()) + diff.set_identical_empty_indices(result.migration_indices) + if diff.indices_to_create: + result.migration_indices.update(diff.indices_to_create) + if result.migration_indices: + doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info) + result.target_doc_count = doc_count_result.total + if args.report: + print_report(diff, result.target_doc_count) + if result.migration_indices: + # Write output YAML + if len(args.output_file) > 0: + write_output(dp_config, result.migration_indices, args.output_file) + logging.debug("Wrote output YAML pipeline to: " + args.output_file) + if not args.dryrun: + index_data = dict() + for index_name in diff.indices_to_create: + index_data[index_name] = source_indices[index_name] + failed_indices = index_operations.create_indices(index_data, target_endpoint_info) + fail_count = len(failed_indices) + if fail_count > 0: + logging.error(f"Failed to create {fail_count} of {len(index_data)} indices") + for failed_index_name, error in failed_indices.items(): + logging.error(f"Index name {failed_index_name} failed: {error!s}") + raise RuntimeError("Metadata migration failed, index creation unsuccessful") + return result + + +if __name__ == '__main__': # pragma no cover + # Set up parsing for command line arguments + arg_parser = argparse.ArgumentParser( + prog="python metadata_migration.py", + description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" + + "The first input to the tool is a path to a Data Prepper pipeline YAML file, which is parsed to obtain " + + "the source and target cluster endpoints.\nThe second input is an output path to which a modified version " + + "of the pipeline YAML file is written. This version of the pipeline adds an index inclusion configuration " + + "to the sink, specifying only those indices that were created by the index configuration tool.\nThis tool " + + "can also print a report based on the indices in the source cluster, indicating which ones will be created, " + + "along with indices that are identical or have conflicting settings/mappings.", + formatter_class=argparse.RawTextHelpFormatter + ) + # Required positional argument + arg_parser.add_argument( + "config_file_path", + help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information" + ) + # Optional positional argument + arg_parser.add_argument( + "output_file", + nargs='?', default="", + help="Output path for the Data Prepper pipeline YAML file that will be generated" + ) + # Flags + arg_parser.add_argument("--report", "-r", action="store_true", + help="Print a report of the index differences") + arg_parser.add_argument("--dryrun", action="store_true", + help="Skips the actual creation of indices on the target cluster") + namespace = arg_parser.parse_args() + run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun)) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py new file mode 100644 index 000000000..a04bfd612 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py @@ -0,0 +1,18 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass + + +@dataclass +class MetadataMigrationParams: + config_file_path: str + output_file: str = "" + report: bool = False + dryrun: bool = False diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py new file mode 100644 index 000000000..27be1dafc --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py @@ -0,0 +1,17 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass, field + + +@dataclass +class MetadataMigrationResult: + target_doc_count: int = 0 + # Set of indices for which data needs to be migrated + migration_indices: set = field(default_factory=set) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py new file mode 100644 index 000000000..4681f71ed --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py @@ -0,0 +1,60 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import argparse +import os +import yaml +import endpoint_utils + +if __name__ == '__main__': + # Set up parsing for command line arguments + arg_parser = argparse.ArgumentParser( + prog="python osi_data_migration.py", + description="Launches an AWS Opensearch Ingestion pipeline given an input pipeline" + ) + # Required positional argument + arg_parser.add_argument( + "input", + help="Path to the input Data Prepper pipeline YAML file" + ) + arg_parser.add_argument( + "output", + help="Path to which the output OSI pipeline YAML will be written" + ) + namespace = arg_parser.parse_args() + with open(namespace.input, 'r') as input: + dp_config = yaml.safe_load(input) + # We expect the Data Prepper pipeline to only have a single top-level value + pipeline_config = next(iter(dp_config.values())) + # Remove options that are unsupported by OSI + if "workers" in pipeline_config: + del pipeline_config["workers"] + if "delay" in pipeline_config: + del pipeline_config["delay"] + # Add OSI version spec to the top-level dict + dp_config["version"] = "2" + + # Remove options from source and sink that are unsupported by OSI + # and add AWS config instead + config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1] + # Fargate stores the current region in the AWS_REGION env var + region: str = os.environ.get("AWS_REGION") + if "disable_authentication" in config: + del config["disable_authentication"] + config["aws"] = {"region": region} + + config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SINK_KEY)[1] + if "disable_authentication" in config: + del config["disable_authentication"] + config["aws"] = {"region": region} + + # Write OSI pipeline config to output file + with open(namespace.output, 'w') as out_file: + yaml.dump(dp_config, out_file) + diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt new file mode 100644 index 000000000..b9352bd73 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt @@ -0,0 +1,8 @@ +botocore>=1.31.70 +jsondiff>=2.0.0 +jsonpath-ng>=1.6.0 +prometheus-client>=0.17.1 +pyyaml>=6.0.1 +requests>=2.31.0 +requests-aws4auth>=1.2.3 +responses>=0.23.3 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py new file mode 100644 index 000000000..f570edb63 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py @@ -0,0 +1,31 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from jsondiff import diff + + +# Utility method to make a comma-separated string from a set. +# If the set is empty, "[]" is returned for clarity. +def string_from_set(s: set[str]) -> str: + result = "[" + if s: + result += ", ".join(s) + return result + "]" + + +# Utility method to compare the JSON contents of a key in two dicts. +# This method handles checking if the key exists in either dict. +def has_differences(key: str, dict1: dict, dict2: dict) -> bool: + if key not in dict1 and key not in dict2: + return False + elif key in dict1 and key in dict2: + data_diff = diff(dict1[key], dict2[key]) + return bool(data_diff) + else: + return True diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh new file mode 100644 index 000000000..2b356d693 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +# Start by clearing any previous state +if [ -e /tmp/user_pipeline ]; then + rm /tmp/user_pipeline +fi +if [ -e /tmp/temp_pipeline ]; then + rm /tmp/temp_pipeline +fi + +# Ensure target cluster endpoint is available as an env var +if [ -z "$MIGRATION_DOMAIN_ENDPOINT" ]; then + echo "MIGRATION_DOMAIN_ENDPOINT environment variable not found for target cluster endpoint, exiting..." + exit 1 +fi + +# Default values +secret_name="dev-default-fetch-migration-pipelineConfig" + +# Override default values with optional command-line arguments +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + --secret) + secret_name="$2" + shift + shift + ;; + *) + shift + ;; + esac +done + +# Get pipeline config from secrets manager +pipeline_config=`aws secretsmanager get-secret-value --secret-id $secret_name | jq -r '.SecretString' | base64 -d` +# Replace target cluster placeholder with actual endpoint value +pipeline_config=${pipeline_config//$MIGRATION_DOMAIN_ENDPOINT} +# Write output to temp file for use by metadata migration +cat <<<$pipeline_config > /tmp/user_pipeline +# Setup and run metadata migration +cd python/ +pip3 install --user -r requirements.txt +# TODO - Configure Python logs to output to console +python3 metadata_migration.py -r /tmp/user_pipeline /tmp/temp_pipeline +python3 osi_data_migration.py /tmp/temp_pipeline /tmp/osi_pipeline.yaml +cd .. +cat /tmp/osi_pipeline.yaml +# TODO Add role config to OSI pipeline before we can create the pipeline from here +#aws osis create-pipeline --pipeline-name fetch-migration --min-units 1 --max-units 1 --pipeline-configuration-body /tmp/osi_pipeline.yaml + +# Clean up state +if [ -e /tmp/user_pipeline ]; then + rm /tmp/user_pipeline +fi +if [ -e /tmp/osi_pipeline.yaml ]; then + rm /tmp/osi_pipeline.yaml +fi diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index ea28d3f23..2dfee784d 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -118,8 +118,17 @@ export class MigrationConsoleStack extends MigrationServiceCore { "iam:PassRole" ] }) + // [POC] Enable Migration Console to fetch pipeline from Secrets Manager + const osiMigrationGetSecretPolicy = new PolicyStatement({ + effect: Effect.ALLOW, + resources: [`arn:aws:secretsmanager:${props.env?.region}:${props.env?.account}:secret:${props.stage}-${props.defaultDeployId}-fetch-migration-pipelineConfig-*`], + actions: [ + "secretsmanager:GetSecretValue" + ] + }) taskRolePolicies.push(fetchMigrationTaskRunPolicy) taskRolePolicies.push(fetchMigrationPassRolePolicy) + taskRolePolicies.push(osiMigrationGetSecretPolicy) } this.createService({