diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index 69186c587..be3aa68cb 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -14,12 +14,18 @@ COPY humanReadableLogs.py /root/ COPY catIndices.sh /root/ COPY showFetchMigrationCommand.sh /root/ COPY setupIntegTests.sh /root/ +COPY runOSIHistoricalDataMigration.sh /root/ COPY msk-iam-auth.properties /root/kafka-tools/aws COPY kafkaCmdRef.md /root/kafka-tools RUN chmod ug+x /root/runTestBenchmarks.sh RUN chmod ug+x /root/humanReadableLogs.py RUN chmod ug+x /root/catIndices.sh RUN chmod ug+x /root/showFetchMigrationCommand.sh +RUN chmod ug+x /root/runOSIHistoricalDataMigration.sh +COPY python/ /root/python/ +# TODO - Installing here messes with OSB's botocore +#WORKDIR /root/python +#RUN pip3 install --user -r requirements.txt WORKDIR /root/kafka-tools # Get kafka distribution and unpack to 'kafka' RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py new file mode 100644 index 000000000..4f06c7427 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py @@ -0,0 +1,50 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from typing import Union + +from requests_aws4auth import AWS4Auth + + +# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster +class EndpointInfo: + # Private member variables + __url: str + # "|" operator is only supported in 3.10+ + __auth: Union[AWS4Auth, tuple, None] + __verify_ssl: bool + + def __init__(self, url: str, auth: Union[AWS4Auth, tuple, None] = None, verify_ssl: bool = True): + self.__url = url + # Normalize url value to have trailing slash + if not url.endswith("/"): + self.__url += "/" + self.__auth = auth + self.__verify_ssl = verify_ssl + + def __eq__(self, obj): + return isinstance(obj, EndpointInfo) and \ + self.__url == obj.__url and \ + self.__auth == obj.__auth and \ + self.__verify_ssl == obj.__verify_ssl + + def add_path(self, path: str) -> str: + # Remove leading slash if present + if path.startswith("/"): + path = path[1:] + return self.__url + path + + def get_url(self) -> str: + return self.__url + + def get_auth(self) -> Union[AWS4Auth, tuple, None]: + return self.__auth + + def is_verify_ssl(self) -> bool: + return self.__verify_ssl diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py new file mode 100644 index 000000000..3b1b3a8ae --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py @@ -0,0 +1,164 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import re +from typing import Optional, Union + +from requests_aws4auth import AWS4Auth +from botocore.session import Session + +from endpoint_info import EndpointInfo + +# Constants +SOURCE_KEY = "source" +SINK_KEY = "sink" +SUPPORTED_PLUGINS = ["opensearch", "elasticsearch"] +HOSTS_KEY = "hosts" +INSECURE_KEY = "insecure" +CONNECTION_KEY = "connection" +DISABLE_AUTH_KEY = "disable_authentication" +USER_KEY = "username" +PWD_KEY = "password" +AWS_SIGV4_KEY = "aws_sigv4" +AWS_REGION_KEY = "aws_region" +AWS_CONFIG_KEY = "aws" +AWS_CONFIG_REGION_KEY = "region" +IS_SERVERLESS_KEY = "serverless" +ES_SERVICE_NAME = "es" +AOSS_SERVICE_NAME = "aoss" +URL_REGION_PATTERN = re.compile(r"([\w-]*)\.(es|aoss)\.amazonaws\.com") + + +def __get_url(plugin_config: dict) -> str: + # "hosts" can be a simple string, or an array of hosts for Logstash to hit. + # This tool needs one accessible host, so we pick the first entry in the latter case. + return plugin_config[HOSTS_KEY][0] if isinstance(plugin_config[HOSTS_KEY], list) else plugin_config[HOSTS_KEY] + + +# Helper function that attempts to extract the AWS region from a URL, +# assuming it is of the form *...amazonaws.com +def __derive_aws_region_from_url(url: str) -> Optional[str]: + match = URL_REGION_PATTERN.search(url) + if match: + # Index 0 returns the entire match, index 1 returns only the first group + return match.group(1) + return None + + +def get_aws_region(plugin_config: dict) -> str: + if plugin_config.get(AWS_SIGV4_KEY, False) and plugin_config.get(AWS_REGION_KEY, None) is not None: + return plugin_config[AWS_REGION_KEY] + elif plugin_config.get(AWS_CONFIG_KEY, None) is not None: + aws_config = plugin_config[AWS_CONFIG_KEY] + if not isinstance(aws_config, dict): + raise ValueError("Unexpected value for 'aws' configuration") + elif aws_config.get(AWS_CONFIG_REGION_KEY, None) is not None: + return aws_config[AWS_CONFIG_REGION_KEY] + # Region not explicitly defined, attempt to derive from URL + derived_region = __derive_aws_region_from_url(__get_url(plugin_config)) + if derived_region is None: + raise ValueError("No region configured for AWS SigV4 auth, or derivable from host URL") + return derived_region + + +def __check_supported_endpoint(section_config: dict) -> Optional[tuple]: + for supported_type in SUPPORTED_PLUGINS: + if supported_type in section_config: + return supported_type, section_config[supported_type] + + +# This config key may be either directly in the main dict (for sink) +# or inside a nested dict (for source). The default value is False. +def is_insecure(plugin_config: dict) -> bool: + if INSECURE_KEY in plugin_config: + return plugin_config[INSECURE_KEY] + elif CONNECTION_KEY in plugin_config and INSECURE_KEY in plugin_config[CONNECTION_KEY]: + return plugin_config[CONNECTION_KEY][INSECURE_KEY] + return False + + +def validate_pipeline(pipeline: dict): + if SOURCE_KEY not in pipeline: + raise ValueError("Missing source configuration in Data Prepper pipeline YAML") + if SINK_KEY not in pipeline: + raise ValueError("Missing sink configuration in Data Prepper pipeline YAML") + + +def validate_auth(plugin_name: str, plugin_config: dict): + # If auth is disabled, no further validation is required + if plugin_config.get(DISABLE_AUTH_KEY, False): + return + # If AWS SigV4 is configured, validate region + if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: + # Raises a ValueError if region cannot be derived + get_aws_region(plugin_config) + # Validate basic auth + elif USER_KEY not in plugin_config: + raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name) + elif PWD_KEY not in plugin_config: + raise ValueError("Invalid auth configuration (no password for username) for plugin: " + plugin_name) + + +def get_supported_endpoint_config(pipeline_config: dict, section_key: str) -> tuple: + # The value of each key may be a single plugin (as a dict) or a list of plugin configs + supported_tuple = tuple() + if isinstance(pipeline_config[section_key], dict): + supported_tuple = __check_supported_endpoint(pipeline_config[section_key]) + elif isinstance(pipeline_config[section_key], list): + for entry in pipeline_config[section_key]: + supported_tuple = __check_supported_endpoint(entry) + # Break out of the loop at the first supported type + if supported_tuple: + break + if not supported_tuple: + raise ValueError("Could not find any supported endpoints in section: " + section_key) + # First tuple value is the plugin name, second value is the plugin config dict + return supported_tuple[0], supported_tuple[1] + + +def get_aws_sigv4_auth(region: str, is_serverless: bool = False) -> AWS4Auth: + credentials = Session().get_credentials() + if not credentials: + raise ValueError("Unable to fetch AWS session credentials for SigV4 auth") + if is_serverless: + return AWS4Auth(region=region, service=AOSS_SERVICE_NAME, refreshable_credentials=credentials) + else: + return AWS4Auth(region=region, service=ES_SERVICE_NAME, refreshable_credentials=credentials) + + +def get_auth(plugin_config: dict) -> Union[AWS4Auth, tuple, None]: + # Basic auth + if USER_KEY in plugin_config and PWD_KEY in plugin_config: + return plugin_config[USER_KEY], plugin_config[PWD_KEY] + elif plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: + is_serverless = False + # OpenSearch Serverless uses a different service name + if AWS_CONFIG_KEY in plugin_config: + aws_config = plugin_config[AWS_CONFIG_KEY] + if isinstance(aws_config, dict) and aws_config.get(IS_SERVERLESS_KEY, False): + is_serverless = True + region = get_aws_region(plugin_config) + return get_aws_sigv4_auth(region, is_serverless) + return None + + +def get_endpoint_info_from_plugin_config(plugin_config: dict) -> EndpointInfo: + # verify boolean will be the inverse of the insecure SSL key, if present + should_verify = not is_insecure(plugin_config) + return EndpointInfo(__get_url(plugin_config), get_auth(plugin_config), should_verify) + + +def get_endpoint_info_from_pipeline_config(pipeline_config: dict, section_key: str) -> EndpointInfo: + # Raises a ValueError if no supported endpoints are found + plugin_name, plugin_config = get_supported_endpoint_config(pipeline_config, section_key) + if HOSTS_KEY not in plugin_config: + raise ValueError("No hosts defined for plugin: " + plugin_name) + # Raises a ValueError if there an error in the auth configuration + validate_auth(plugin_name, plugin_config) + return get_endpoint_info_from_plugin_config(plugin_config) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py new file mode 100644 index 000000000..5e980ba22 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py @@ -0,0 +1,44 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import utils +from index_operations import SETTINGS_KEY, MAPPINGS_KEY + + +# Computes and captures differences in indices between a "source" cluster +# and a "target" cluster. Indices that exist on the source cluster but not +# on the target cluster are considered "to-create". "Conflicting" indices +# are present on both source and target clusters, but differ in their index +# settings or mappings. +class IndexDiff: + indices_to_create: set + identical_indices: set + identical_empty_indices: set + conflicting_indices: set + + def __init__(self, source: dict, target: dict): + self.identical_empty_indices = set() + self.conflicting_indices = set() + # Compute index names that are present in both the source and target + indices_intersection = set(source.keys()) & set(target.keys()) + # Check if these "common" indices are identical or have metadata conflicts + for index in indices_intersection: + # Check settings + if utils.has_differences(SETTINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Check mappings + if utils.has_differences(MAPPINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Identical indices are the subset that do not have metadata conflicts + self.identical_indices = set(indices_intersection) - set(self.conflicting_indices) + # Indices that are not already on the target need to be created + self.indices_to_create = set(source.keys()) - set(indices_intersection) + + def set_identical_empty_indices(self, indices: set): + self.identical_empty_indices = indices diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py new file mode 100644 index 000000000..c4323966b --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py @@ -0,0 +1,17 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass + + +# Captures the doc_count for indices in a cluster, and also computes a total +@dataclass +class IndexDocCount: + total: int + index_doc_count_map: dict diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py new file mode 100644 index 000000000..9c3ed54ea --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_operations.py @@ -0,0 +1,106 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from typing import Optional + +import jsonpath_ng +import requests + +from endpoint_info import EndpointInfo +from index_doc_count import IndexDocCount + +# Constants +SETTINGS_KEY = "settings" +MAPPINGS_KEY = "mappings" +ALIASES_KEY = "aliases" +COUNT_KEY = "count" +__INDEX_KEY = "index" +__ALL_INDICES_ENDPOINT = "*" +__SEARCH_COUNT_PATH = "/_search?size=0" +__SEARCH_COUNT_PAYLOAD = {"aggs": {"count": {"terms": {"field": "_index"}}}} +__TOTAL_COUNT_JSONPATH = jsonpath_ng.parse("$.hits.total.value") +__INDEX_COUNT_JSONPATH = jsonpath_ng.parse("$.aggregations.count.buckets") +__BUCKET_INDEX_NAME_KEY = "key" +__BUCKET_DOC_COUNT_KEY = "doc_count" +__INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"] +__TIMEOUT_SECONDS = 10 + + +def __send_get_request(url: str, endpoint: EndpointInfo, payload: Optional[dict] = None) -> requests.Response: + try: + resp = requests.get(url, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), json=payload, + timeout=__TIMEOUT_SECONDS) + resp.raise_for_status() + return resp + except requests.ConnectionError: + raise RuntimeError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}") + except requests.HTTPError as e: + raise RuntimeError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()} - {e!s}") + except requests.Timeout: + # TODO retry mechanism + raise RuntimeError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}") + except requests.exceptions.RequestException as e: + raise RuntimeError(f"GET request failure to cluster endpoint: {endpoint.get_url()} - {e!s}") + + +def fetch_all_indices(endpoint: EndpointInfo) -> dict: + all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT) + try: + # raises RuntimeError in case of any request errors + resp = __send_get_request(all_indices_url, endpoint) + result = dict(resp.json()) + for index in list(result.keys()): + # Remove system indices + if index.startswith("."): + del result[index] + # Remove internal settings + else: + for setting in __INTERNAL_SETTINGS_KEYS: + index_settings = result[index][SETTINGS_KEY] + if __INDEX_KEY in index_settings: + index_settings[__INDEX_KEY].pop(setting, None) + return result + except RuntimeError as e: + raise RuntimeError(f"Failed to fetch metadata from cluster endpoint: {e!s}") + + +def create_indices(indices_data: dict, endpoint: EndpointInfo) -> dict: + failed_indices = dict() + for index in indices_data: + index_endpoint = endpoint.add_path(index) + data_dict = dict() + data_dict[SETTINGS_KEY] = indices_data[index][SETTINGS_KEY] + data_dict[MAPPINGS_KEY] = indices_data[index][MAPPINGS_KEY] + if ALIASES_KEY in indices_data[index] and len(indices_data[index][ALIASES_KEY]) > 0: + data_dict[ALIASES_KEY] = indices_data[index][ALIASES_KEY] + try: + resp = requests.put(index_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), + json=data_dict, timeout=__TIMEOUT_SECONDS) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + failed_indices[index] = e + # Loop completed, return failures if any + return failed_indices + + +def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount: + count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH + doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix) + try: + # raises RuntimeError in case of any request errors + resp = __send_get_request(doc_count_endpoint, endpoint, __SEARCH_COUNT_PAYLOAD) + result = dict(resp.json()) + total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value + counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value + count_map = dict() + for entry in counts_list: + count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY] + return IndexDocCount(total, count_map) + except RuntimeError as e: + raise RuntimeError(f"Failed to fetch doc_count: {e!s}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py new file mode 100644 index 000000000..0890a606d --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration.py @@ -0,0 +1,142 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import argparse +import logging + +import yaml + +import endpoint_utils +import index_operations +import utils +from index_diff import IndexDiff +from metadata_migration_params import MetadataMigrationParams +from metadata_migration_result import MetadataMigrationResult + +# Constants +INDICES_KEY = "indices" +INCLUDE_KEY = "include" +INDEX_NAME_KEY = "index_name_regex" + + +def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str): + pipeline_config = next(iter(yaml_data.values())) + # Result is a tuple of (type, config) + source_config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1] + source_indices = source_config.get(INDICES_KEY, dict()) + included_indices = source_indices.get(INCLUDE_KEY, list()) + for index in indices_to_migrate: + included_indices.append({INDEX_NAME_KEY: index}) + source_indices[INCLUDE_KEY] = included_indices + source_config[INDICES_KEY] = source_indices + with open(output_path, 'w') as out_file: + yaml.dump(yaml_data, out_file) + + +def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover + logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices)) + logging.info("Identical empty indices in the target cluster (will be migrated): " + + utils.string_from_set(diff.identical_empty_indices)) + logging.info("Indices present in both clusters with conflicting settings/mappings (will NOT be migrated): " + + utils.string_from_set(diff.conflicting_indices)) + logging.info("Indices to be created in the target cluster (will be migrated): " + + utils.string_from_set(diff.indices_to_create)) + logging.info("Target document count: " + str(total_doc_count)) + + +def run(args: MetadataMigrationParams) -> MetadataMigrationResult: + # Sanity check + if not args.report and len(args.output_file) == 0: + raise ValueError("No output file specified") + # Parse and validate pipelines YAML file + with open(args.config_file_path, 'r') as pipeline_file: + dp_config = yaml.safe_load(pipeline_file) + # We expect the Data Prepper pipeline to only have a single top-level value + pipeline_config = next(iter(dp_config.values())) + # Raises a ValueError if source or sink definitions are missing + endpoint_utils.validate_pipeline(pipeline_config) + source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config, + endpoint_utils.SOURCE_KEY) + target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config, + endpoint_utils.SINK_KEY) + result = MetadataMigrationResult() + # Fetch indices + source_indices = index_operations.fetch_all_indices(source_endpoint_info) + # If source indices is empty, return immediately + if len(source_indices.keys()) == 0: + return result + target_indices = index_operations.fetch_all_indices(target_endpoint_info) + # Compute index differences and print report + diff = IndexDiff(source_indices, target_indices) + if diff.identical_indices: + # Identical indices with zero documents on the target are eligible for migration + target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info) + # doc_count only returns indices that have non-zero counts, so the difference in responses + # gives us the set of identical, empty indices + result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys()) + diff.set_identical_empty_indices(result.migration_indices) + if diff.indices_to_create: + result.migration_indices.update(diff.indices_to_create) + if result.migration_indices: + doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info) + result.target_doc_count = doc_count_result.total + if args.report: + print_report(diff, result.target_doc_count) + if result.migration_indices: + # Write output YAML + if len(args.output_file) > 0: + write_output(dp_config, result.migration_indices, args.output_file) + logging.debug("Wrote output YAML pipeline to: " + args.output_file) + if not args.dryrun: + index_data = dict() + for index_name in diff.indices_to_create: + index_data[index_name] = source_indices[index_name] + failed_indices = index_operations.create_indices(index_data, target_endpoint_info) + fail_count = len(failed_indices) + if fail_count > 0: + logging.error(f"Failed to create {fail_count} of {len(index_data)} indices") + for failed_index_name, error in failed_indices.items(): + logging.error(f"Index name {failed_index_name} failed: {error!s}") + raise RuntimeError("Metadata migration failed, index creation unsuccessful") + return result + + +if __name__ == '__main__': # pragma no cover + # Set log level + logging.basicConfig(level=logging.INFO) + # Set up parsing for command line arguments + arg_parser = argparse.ArgumentParser( + prog="python metadata_migration.py", + description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" + + "The first input to the tool is a path to a Data Prepper pipeline YAML file, which is parsed to obtain " + + "the source and target cluster endpoints.\nThe second input is an output path to which a modified version " + + "of the pipeline YAML file is written. This version of the pipeline adds an index inclusion configuration " + + "to the sink, specifying only those indices that were created by the index configuration tool.\nThis tool " + + "can also print a report based on the indices in the source cluster, indicating which ones will be created, " + + "along with indices that are identical or have conflicting settings/mappings.", + formatter_class=argparse.RawTextHelpFormatter + ) + # Required positional argument + arg_parser.add_argument( + "config_file_path", + help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information" + ) + # Optional positional argument + arg_parser.add_argument( + "output_file", + nargs='?', default="", + help="Output path for the Data Prepper pipeline YAML file that will be generated" + ) + # Flags + arg_parser.add_argument("--report", "-r", action="store_true", + help="Print a report of the index differences") + arg_parser.add_argument("--dryrun", action="store_true", + help="Skips the actual creation of indices on the target cluster") + namespace = arg_parser.parse_args() + run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun)) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py new file mode 100644 index 000000000..a04bfd612 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_params.py @@ -0,0 +1,18 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass + + +@dataclass +class MetadataMigrationParams: + config_file_path: str + output_file: str = "" + report: bool = False + dryrun: bool = False diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py new file mode 100644 index 000000000..27be1dafc --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/metadata_migration_result.py @@ -0,0 +1,17 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from dataclasses import dataclass, field + + +@dataclass +class MetadataMigrationResult: + target_doc_count: int = 0 + # Set of indices for which data needs to be migrated + migration_indices: set = field(default_factory=set) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py new file mode 100644 index 000000000..5c30f3215 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/osi_data_migration.py @@ -0,0 +1,64 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import argparse +import logging +import os +import yaml +import endpoint_utils + +if __name__ == '__main__': + # Set log level + logging.basicConfig(level=logging.INFO) + # Set up parsing for command line arguments + arg_parser = argparse.ArgumentParser( + prog="python osi_data_migration.py", + description="Launches an AWS Opensearch Ingestion pipeline given an input pipeline" + ) + # Required positional argument + arg_parser.add_argument( + "input", + help="Path to the input Data Prepper pipeline YAML file" + ) + arg_parser.add_argument( + "output", + help="Path to which the output OSI pipeline YAML will be written" + ) + namespace = arg_parser.parse_args() + with open(namespace.input, 'r') as input: + dp_config = yaml.safe_load(input) + # We expect the Data Prepper pipeline to only have a single top-level value + pipeline_config = next(iter(dp_config.values())) + # Remove options that are unsupported by OSI + if "workers" in pipeline_config: + del pipeline_config["workers"] + if "delay" in pipeline_config: + del pipeline_config["delay"] + # Add OSI version spec to the top-level dict + dp_config["version"] = "2" + + # Remove options from source and sink that are unsupported by OSI + # and add AWS config instead + config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1] + # Fargate stores the current region in the AWS_REGION env var + region: str = os.environ.get("AWS_REGION") + pipeline_role_arn: str = os.environ.get("OSIS_PIPELINE_ROLE_ARN") + if "disable_authentication" in config: + del config["disable_authentication"] + config["aws"] = {"region": region, "sts_role_arn": pipeline_role_arn} + + config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SINK_KEY)[1] + if "disable_authentication" in config: + del config["disable_authentication"] + config["aws"] = {"region": region, "sts_role_arn": pipeline_role_arn} + + # Write OSI pipeline config to output file + with open(namespace.output, 'w') as out_file: + yaml.dump(dp_config, out_file) + diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt new file mode 100644 index 000000000..b9352bd73 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/requirements.txt @@ -0,0 +1,8 @@ +botocore>=1.31.70 +jsondiff>=2.0.0 +jsonpath-ng>=1.6.0 +prometheus-client>=0.17.1 +pyyaml>=6.0.1 +requests>=2.31.0 +requests-aws4auth>=1.2.3 +responses>=0.23.3 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py new file mode 100644 index 000000000..f570edb63 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/utils.py @@ -0,0 +1,31 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from jsondiff import diff + + +# Utility method to make a comma-separated string from a set. +# If the set is empty, "[]" is returned for clarity. +def string_from_set(s: set[str]) -> str: + result = "[" + if s: + result += ", ".join(s) + return result + "]" + + +# Utility method to compare the JSON contents of a key in two dicts. +# This method handles checking if the key exists in either dict. +def has_differences(key: str, dict1: dict, dict2: dict) -> bool: + if key not in dict1 and key not in dict2: + return False + elif key in dict1 and key in dict2: + data_diff = diff(dict1[key], dict2[key]) + return bool(data_diff) + else: + return True diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh new file mode 100644 index 000000000..f9006573d --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runOSIHistoricalDataMigration.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Start by clearing any previous state +if [ -e /tmp/user_pipeline ]; then + rm /tmp/user_pipeline +fi +if [ -e /tmp/temp_pipeline ]; then + rm /tmp/temp_pipeline +fi +if [ -e /tmp/osi_pipeline.yaml ]; then + rm /tmp/osi_pipeline.yaml +fi + +# Ensure target cluster endpoint is available as an env var +if [ -z "$MIGRATION_DOMAIN_ENDPOINT" ]; then + echo "MIGRATION_DOMAIN_ENDPOINT environment variable not found for target cluster endpoint, exiting..." + exit 1 +fi +# Ensure OSIS pipeline role ARN is available as an env var +if [ -z "$OSIS_PIPELINE_ROLE_ARN" ]; then + echo "OSIS_PIPELINE_ROLE_ARN environment variable not found for OSIS pipeline role, exiting..." + exit 1 +fi +# Ensure OSIS pipeline VPC options are available as an env var +if [ -z "$OSIS_PIPELINE_VPC_OPTIONS" ]; then + echo "OSIS_PIPELINE_VPC_OPTIONS environment variable not found, exiting..." + exit 1 +fi + +# Default values +secret_name="dev-default-fetch-migration-pipelineConfig" + +# Override default values with optional command-line arguments +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + --secret) + secret_name="$2" + shift + shift + ;; + *) + shift + ;; + esac +done + +# Get pipeline config from secrets manager +pipeline_config=`aws secretsmanager get-secret-value --secret-id $secret_name | jq -r '.SecretString' | base64 -d` +# Remove any port from target endpoint because OSIS doesn't allow it +target_endpoint=${MIGRATION_DOMAIN_ENDPOINT%:[0-9]*} +# Replace target cluster placeholder with actual endpoint value +pipeline_config=${pipeline_config//$target_endpoint} +# Write output to temp file for use by metadata migration +cat <<<$pipeline_config > /tmp/user_pipeline +# Setup and run metadata migration +cd python/ +pip3 install --user -r requirements.txt +# Run metadata migration +python3 metadata_migration.py -r /tmp/user_pipeline /tmp/temp_pipeline +# Parse output file from previous step to create OSI pipeline input +python3 osi_data_migration.py /tmp/temp_pipeline /tmp/osi_pipeline.yaml +cd .. +# TODO Figure out why log publishing config via CLI fails +#aws osis create-pipeline --pipeline-name osi-fetch-migration --min-units 1 --max-units 1 --pipeline-configuration-body file:///tmp/osi_pipeline.yaml --log-publishing-options IsLoggingEnabled=true,CloudWatchLogDestination={LogGroup=/aws/vendedlogs/OpenSearchService/pipelines/osi-fetch-migration} --vpc-options $OSIS_PIPELINE_VPC_OPTIONS +aws osis create-pipeline --pipeline-name osi-fetch-migration --min-units 1 --max-units 1 --pipeline-configuration-body file:///tmp/osi_pipeline.yaml --vpc-options $OSIS_PIPELINE_VPC_OPTIONS + +# Clean up state +if [ -e /tmp/user_pipeline ]; then + rm /tmp/user_pipeline +fi +if [ -e /tmp/temp_pipeline.yaml ]; then + rm /tmp/temp_pipeline.yaml +fi diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index b9334d4f6..f4035cdf2 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -5,7 +5,7 @@ import {Construct} from "constructs"; import {join} from "path"; import {MigrationServiceCore} from "./migration-service-core"; import {StringParameter} from "aws-cdk-lib/aws-ssm"; -import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam"; +import {Effect, PolicyStatement, Role, ServicePrincipal} from "aws-cdk-lib/aws-iam"; import { createMSKConsumerIAMPolicies, createOpenSearchIAMAccessPolicy, @@ -54,10 +54,11 @@ export class MigrationConsoleStack extends MigrationServiceCore { constructor(scope: Construct, id: string, props: MigrationConsoleProps) { super(scope, id, props) + const domainAccessGroupId = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osAccessSecurityGroupId`) let securityGroups = [ SecurityGroup.fromSecurityGroupId(this, "serviceConnectSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/serviceConnectSecurityGroupId`)), SecurityGroup.fromSecurityGroupId(this, "trafficStreamSourceAccessSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/trafficStreamSourceAccessSecurityGroupId`)), - SecurityGroup.fromSecurityGroupId(this, "defaultDomainAccessSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osAccessSecurityGroupId`)), + SecurityGroup.fromSecurityGroupId(this, "defaultDomainAccessSG", domainAccessGroupId), SecurityGroup.fromSecurityGroupId(this, "replayerOutputAccessSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/replayerOutputAccessSecurityGroupId`)) ] if (props.migrationAnalyticsEnabled) { @@ -112,6 +113,18 @@ export class MigrationConsoleStack extends MigrationServiceCore { } if (props.fetchMigrationEnabled) { environment["FETCH_MIGRATION_COMMAND"] = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/fetchMigrationCommand`) + // [POC] Add a pipeline role for OSIS + const osisPipelineRole = new Role(this, 'osisPipelineRole', { + assumedBy: new ServicePrincipal('osis-pipelines.amazonaws.com'), + description: 'OSIS Pipeline role for Fetch Migration' + }); + // Add policy to allow access to Opensearch domains + osisPipelineRole.addToPolicy(new PolicyStatement({ + effect: Effect.ALLOW, + actions: ["es:DescribeDomain", "es:ESHttp*"], + resources: [`arn:aws:es:${props.env?.region}:${props.env?.account}:domain/*`] + })) + environment["OSIS_PIPELINE_ROLE_ARN"] = osisPipelineRole.roleArn const fetchMigrationTaskDefArn = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/fetchMigrationTaskDefArn`); const fetchMigrationTaskRunPolicy = new PolicyStatement({ @@ -124,15 +137,41 @@ export class MigrationConsoleStack extends MigrationServiceCore { const fetchMigrationTaskRoleArn = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/fetchMigrationTaskRoleArn`); const fetchMigrationTaskExecRoleArn = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/fetchMigrationTaskExecRoleArn`); // Required as per https://docs.aws.amazon.com/AmazonECS/latest/userguide/task-iam-roles.html + // [POC] Allow passing of pipeline role const fetchMigrationPassRolePolicy = new PolicyStatement({ effect: Effect.ALLOW, - resources: [fetchMigrationTaskRoleArn, fetchMigrationTaskExecRoleArn], + resources: [fetchMigrationTaskRoleArn, fetchMigrationTaskExecRoleArn, osisPipelineRole.roleArn], actions: [ "iam:PassRole" ] }) servicePolicies.push(fetchMigrationTaskRunPolicy) servicePolicies.push(fetchMigrationPassRolePolicy) + + // [POC] Enable Migration Console to fetch pipeline from Secrets Manager + const osiMigrationGetSecretPolicy = new PolicyStatement({ + effect: Effect.ALLOW, + resources: [`arn:aws:secretsmanager:${props.env?.region}:${props.env?.account}:secret:${props.stage}-${props.defaultDeployId}-fetch-migration-pipelineConfig-*`], + actions: [ + "secretsmanager:GetSecretValue" + ] + }) + + // [POC] Enable OSIS management from Migration Console + const osisManagementPolicy = new PolicyStatement({ + effect: Effect.ALLOW, + resources: ["*"], + actions: [ + "osis:*" + ] + }) + taskRolePolicies.push(fetchMigrationTaskRunPolicy) + taskRolePolicies.push(fetchMigrationPassRolePolicy) + taskRolePolicies.push(osiMigrationGetSecretPolicy) + taskRolePolicies.push(osisManagementPolicy) + + // [POC] Add VPC options to environment + environment["OSIS_PIPELINE_VPC_OPTIONS"] = `SubnetIds=${props.vpc.privateSubnets.map(_ => _.subnetId).join(",")},SecurityGroupIds=${domainAccessGroupId}` } this.createService({ @@ -149,4 +188,4 @@ export class MigrationConsoleStack extends MigrationServiceCore { }); } -} \ No newline at end of file +}