This repository has been archived by the owner on Apr 11, 2024. It is now read-only.
forked from opensearch-project/opensearch-migrations
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[POC] Fetch Migration implementation using AWS Opensearch Ingestion (…
…and Migration Console for metadata migration) This change introduces a "runOSIHistoricalDataMigration.sh" shell script to the Migration Console image that enables historical data migration using AWS Opensearch Ingestion (OSI). Metadata migration is now performed via the Migration Console directly, using the same Python scripts as the Fetch Migration image. The Python scripts have been tweaked to configure logging output (Fetch Migration manages this via the fetch_orchestrator.py entrypoint, which is unnecessary here). The output pipeline file from the metadata migration step is then provided to an new Python file - "osi_data_migration.py". The purpose of this code is to modify the pipeline file to meet OSI expectations. Some of the modifications made are: * Adding a "version: 2" configuration * Removing "workers" and "delay" configuration since these are managed by OSI * Removing "disable_authentication" configuration since this is not supported by OSI * Adding AWS SigV4 configuration with region and STS role, since these are required by OSI The code finally writes out the modified pipeline configuration to a new file. This output file is then used by the shell script to create an OSI pipeline via AWS CLI commands. Various access permissions have been added to the Console CDK to enable it to manage the OSI pipeline and also fetch the pipeline secret value from Secrets Manager. Environment variables have also been added to enable execution of this workflow. Some important caveats of this POC: 1 - The Fetch Migration files have been duplicated to the migration console folder. TODO - Find a better way to lay this out. 2 - The Python dependencies for metadata migration (from requirements.txt) are not installed by the Docker image since this causes conflicts with the botocore library from Opensearch Benchmarks. The shell script runs the installation before executing the migration Python file. 3 - This POC implementation downloads the pipeline secret value to the Migration Console instance, which may be a security risk 4 - OSI imposes several restrictions which this implementation only gets around but does not solve. Since OSI does not support basic auth, this POC has only been tested against source and target clusters within a VPC that do not have any auth enabled. However, SigV4 configuration is required by OSI so these are forced into the configuration (though the signing has no effect on requests since both endpoints have no auth configured). Opensearch Serverless or endpoints with SigV4 auth have NOT been tested. 5 - This historical data migration implementation does not include pipeline monitoring and auto-shutdown since the OSI pipeline does not expose a metrics endpoint. THe OSI pipeline must be shut down manually (either via AWS CLI commands from the Console, or from the AWS UI). TODO - Build integration with CloudWatch for monitoring. Signed-off-by: Kartik Ganesh <[email protected]>
- Loading branch information
Showing
16 changed files
with
941 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
...Capture/dockerSolution/src/main/docker/migrationConsole/python/component_template_info.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
# | ||
|
||
|
||
# Constants | ||
from typing import Optional | ||
|
||
NAME_KEY = "name" | ||
DEFAULT_TEMPLATE_KEY = "component_template" | ||
|
||
|
||
# Class that encapsulates component template information | ||
class ComponentTemplateInfo: | ||
# Private member variables | ||
__name: str | ||
__template_def: Optional[dict] | ||
|
||
def __init__(self, template_payload: dict, template_key: str = DEFAULT_TEMPLATE_KEY): | ||
self.__name = template_payload[NAME_KEY] | ||
self.__template_def = None | ||
if template_key in template_payload: | ||
self.__template_def = template_payload[template_key] | ||
|
||
def get_name(self) -> str: | ||
return self.__name | ||
|
||
def get_template_definition(self) -> dict: | ||
return self.__template_def |
50 changes: 50 additions & 0 deletions
50
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_info.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
# | ||
|
||
from typing import Union | ||
|
||
from requests_aws4auth import AWS4Auth | ||
|
||
|
||
# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster | ||
class EndpointInfo: | ||
# Private member variables | ||
__url: str | ||
# "|" operator is only supported in 3.10+ | ||
__auth: Union[AWS4Auth, tuple, None] | ||
__verify_ssl: bool | ||
|
||
def __init__(self, url: str, auth: Union[AWS4Auth, tuple, None] = None, verify_ssl: bool = True): | ||
self.__url = url | ||
# Normalize url value to have trailing slash | ||
if not url.endswith("/"): | ||
self.__url += "/" | ||
self.__auth = auth | ||
self.__verify_ssl = verify_ssl | ||
|
||
def __eq__(self, obj): | ||
return isinstance(obj, EndpointInfo) and \ | ||
self.__url == obj.__url and \ | ||
self.__auth == obj.__auth and \ | ||
self.__verify_ssl == obj.__verify_ssl | ||
|
||
def add_path(self, path: str) -> str: | ||
# Remove leading slash if present | ||
if path.startswith("/"): | ||
path = path[1:] | ||
return self.__url + path | ||
|
||
def get_url(self) -> str: | ||
return self.__url | ||
|
||
def get_auth(self) -> Union[AWS4Auth, tuple, None]: | ||
return self.__auth | ||
|
||
def is_verify_ssl(self) -> bool: | ||
return self.__verify_ssl |
164 changes: 164 additions & 0 deletions
164
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/endpoint_utils.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
# | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
# | ||
|
||
import re | ||
from typing import Optional, Union | ||
|
||
from requests_aws4auth import AWS4Auth | ||
from botocore.session import Session | ||
|
||
from endpoint_info import EndpointInfo | ||
|
||
# Constants | ||
SOURCE_KEY = "source" | ||
SINK_KEY = "sink" | ||
SUPPORTED_PLUGINS = ["opensearch", "elasticsearch"] | ||
HOSTS_KEY = "hosts" | ||
INSECURE_KEY = "insecure" | ||
CONNECTION_KEY = "connection" | ||
DISABLE_AUTH_KEY = "disable_authentication" | ||
USER_KEY = "username" | ||
PWD_KEY = "password" | ||
AWS_SIGV4_KEY = "aws_sigv4" | ||
AWS_REGION_KEY = "aws_region" | ||
AWS_CONFIG_KEY = "aws" | ||
AWS_CONFIG_REGION_KEY = "region" | ||
IS_SERVERLESS_KEY = "serverless" | ||
ES_SERVICE_NAME = "es" | ||
AOSS_SERVICE_NAME = "aoss" | ||
URL_REGION_PATTERN = re.compile(r"([\w-]*)\.(es|aoss)\.amazonaws\.com") | ||
|
||
|
||
def __get_url(plugin_config: dict) -> str: | ||
# "hosts" can be a simple string, or an array of hosts for Logstash to hit. | ||
# This tool needs one accessible host, so we pick the first entry in the latter case. | ||
return plugin_config[HOSTS_KEY][0] if isinstance(plugin_config[HOSTS_KEY], list) else plugin_config[HOSTS_KEY] | ||
|
||
|
||
# Helper function that attempts to extract the AWS region from a URL, | ||
# assuming it is of the form *.<region>.<service>.amazonaws.com | ||
def __derive_aws_region_from_url(url: str) -> Optional[str]: | ||
match = URL_REGION_PATTERN.search(url) | ||
if match: | ||
# Index 0 returns the entire match, index 1 returns only the first group | ||
return match.group(1) | ||
return None | ||
|
||
|
||
def get_aws_region(plugin_config: dict) -> str: | ||
if plugin_config.get(AWS_SIGV4_KEY, False) and plugin_config.get(AWS_REGION_KEY, None) is not None: | ||
return plugin_config[AWS_REGION_KEY] | ||
elif plugin_config.get(AWS_CONFIG_KEY, None) is not None: | ||
aws_config = plugin_config[AWS_CONFIG_KEY] | ||
if not isinstance(aws_config, dict): | ||
raise ValueError("Unexpected value for 'aws' configuration") | ||
elif aws_config.get(AWS_CONFIG_REGION_KEY, None) is not None: | ||
return aws_config[AWS_CONFIG_REGION_KEY] | ||
# Region not explicitly defined, attempt to derive from URL | ||
derived_region = __derive_aws_region_from_url(__get_url(plugin_config)) | ||
if derived_region is None: | ||
raise ValueError("No region configured for AWS SigV4 auth, or derivable from host URL") | ||
return derived_region | ||
|
||
|
||
def __check_supported_endpoint(section_config: dict) -> Optional[tuple]: | ||
for supported_type in SUPPORTED_PLUGINS: | ||
if supported_type in section_config: | ||
return supported_type, section_config[supported_type] | ||
|
||
|
||
# This config key may be either directly in the main dict (for sink) | ||
# or inside a nested dict (for source). The default value is False. | ||
def is_insecure(plugin_config: dict) -> bool: | ||
if INSECURE_KEY in plugin_config: | ||
return plugin_config[INSECURE_KEY] | ||
elif CONNECTION_KEY in plugin_config and INSECURE_KEY in plugin_config[CONNECTION_KEY]: | ||
return plugin_config[CONNECTION_KEY][INSECURE_KEY] | ||
return False | ||
|
||
|
||
def validate_pipeline(pipeline: dict): | ||
if SOURCE_KEY not in pipeline: | ||
raise ValueError("Missing source configuration in Data Prepper pipeline YAML") | ||
if SINK_KEY not in pipeline: | ||
raise ValueError("Missing sink configuration in Data Prepper pipeline YAML") | ||
|
||
|
||
def validate_auth(plugin_name: str, plugin_config: dict): | ||
# If auth is disabled, no further validation is required | ||
if plugin_config.get(DISABLE_AUTH_KEY, False): | ||
return | ||
# If AWS SigV4 is configured, validate region | ||
if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: | ||
# Raises a ValueError if region cannot be derived | ||
get_aws_region(plugin_config) | ||
# Validate basic auth | ||
elif USER_KEY not in plugin_config: | ||
raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name) | ||
elif PWD_KEY not in plugin_config: | ||
raise ValueError("Invalid auth configuration (no password for username) for plugin: " + plugin_name) | ||
|
||
|
||
def get_supported_endpoint_config(pipeline_config: dict, section_key: str) -> tuple: | ||
# The value of each key may be a single plugin (as a dict) or a list of plugin configs | ||
supported_tuple = tuple() | ||
if isinstance(pipeline_config[section_key], dict): | ||
supported_tuple = __check_supported_endpoint(pipeline_config[section_key]) | ||
elif isinstance(pipeline_config[section_key], list): | ||
for entry in pipeline_config[section_key]: | ||
supported_tuple = __check_supported_endpoint(entry) | ||
# Break out of the loop at the first supported type | ||
if supported_tuple: | ||
break | ||
if not supported_tuple: | ||
raise ValueError("Could not find any supported endpoints in section: " + section_key) | ||
# First tuple value is the plugin name, second value is the plugin config dict | ||
return supported_tuple[0], supported_tuple[1] | ||
|
||
|
||
def get_aws_sigv4_auth(region: str, is_serverless: bool = False) -> AWS4Auth: | ||
credentials = Session().get_credentials() | ||
if not credentials: | ||
raise ValueError("Unable to fetch AWS session credentials for SigV4 auth") | ||
if is_serverless: | ||
return AWS4Auth(region=region, service=AOSS_SERVICE_NAME, refreshable_credentials=credentials) | ||
else: | ||
return AWS4Auth(region=region, service=ES_SERVICE_NAME, refreshable_credentials=credentials) | ||
|
||
|
||
def get_auth(plugin_config: dict) -> Union[AWS4Auth, tuple, None]: | ||
# Basic auth | ||
if USER_KEY in plugin_config and PWD_KEY in plugin_config: | ||
return plugin_config[USER_KEY], plugin_config[PWD_KEY] | ||
elif plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config: | ||
is_serverless = False | ||
# OpenSearch Serverless uses a different service name | ||
if AWS_CONFIG_KEY in plugin_config: | ||
aws_config = plugin_config[AWS_CONFIG_KEY] | ||
if isinstance(aws_config, dict) and aws_config.get(IS_SERVERLESS_KEY, False): | ||
is_serverless = True | ||
region = get_aws_region(plugin_config) | ||
return get_aws_sigv4_auth(region, is_serverless) | ||
return None | ||
|
||
|
||
def get_endpoint_info_from_plugin_config(plugin_config: dict) -> EndpointInfo: | ||
# verify boolean will be the inverse of the insecure SSL key, if present | ||
should_verify = not is_insecure(plugin_config) | ||
return EndpointInfo(__get_url(plugin_config), get_auth(plugin_config), should_verify) | ||
|
||
|
||
def get_endpoint_info_from_pipeline_config(pipeline_config: dict, section_key: str) -> EndpointInfo: | ||
# Raises a ValueError if no supported endpoints are found | ||
plugin_name, plugin_config = get_supported_endpoint_config(pipeline_config, section_key) | ||
if HOSTS_KEY not in plugin_config: | ||
raise ValueError("No hosts defined for plugin: " + plugin_name) | ||
# Raises a ValueError if there an error in the auth configuration | ||
validate_auth(plugin_name, plugin_config) | ||
return get_endpoint_info_from_plugin_config(plugin_config) |
44 changes: 44 additions & 0 deletions
44
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_diff.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
# | ||
|
||
import utils | ||
from index_operations import SETTINGS_KEY, MAPPINGS_KEY | ||
|
||
|
||
# Computes and captures differences in indices between a "source" cluster | ||
# and a "target" cluster. Indices that exist on the source cluster but not | ||
# on the target cluster are considered "to-create". "Conflicting" indices | ||
# are present on both source and target clusters, but differ in their index | ||
# settings or mappings. | ||
class IndexDiff: | ||
indices_to_create: set | ||
identical_indices: set | ||
identical_empty_indices: set | ||
conflicting_indices: set | ||
|
||
def __init__(self, source: dict, target: dict): | ||
self.identical_empty_indices = set() | ||
self.conflicting_indices = set() | ||
# Compute index names that are present in both the source and target | ||
indices_intersection = set(source.keys()) & set(target.keys()) | ||
# Check if these "common" indices are identical or have metadata conflicts | ||
for index in indices_intersection: | ||
# Check settings | ||
if utils.has_differences(SETTINGS_KEY, source[index], target[index]): | ||
self.conflicting_indices.add(index) | ||
# Check mappings | ||
if utils.has_differences(MAPPINGS_KEY, source[index], target[index]): | ||
self.conflicting_indices.add(index) | ||
# Identical indices are the subset that do not have metadata conflicts | ||
self.identical_indices = set(indices_intersection) - set(self.conflicting_indices) | ||
# Indices that are not already on the target need to be created | ||
self.indices_to_create = set(source.keys()) - set(indices_intersection) | ||
|
||
def set_identical_empty_indices(self, indices: set): | ||
self.identical_empty_indices = indices |
17 changes: 17 additions & 0 deletions
17
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/python/index_doc_count.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Copyright OpenSearch Contributors | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
# | ||
|
||
from dataclasses import dataclass | ||
|
||
|
||
# Captures the doc_count for indices in a cluster, and also computes a total | ||
@dataclass | ||
class IndexDocCount: | ||
total: int | ||
index_doc_count_map: dict |
Oops, something went wrong.