Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[WIP 2] Enable Migration Console to run metadata migration logic dire…
Browse files Browse the repository at this point in the history
…ctly

TODO - Currently the python files have been duplicated from FetchMigration. Need to find a better way to lay this out.
Also, Fetch Migration python code is not installed by the Dockerfile since it causes conflicts with the botocore library from Opensearch Benchmarks.

This change adds a "runOSIHistoricalDataMigration" shell script to the Migration Console image and adds permissions for the Console to fetch the pipeline secret value from Secrets Manager

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Dec 12, 2023
1 parent 5dbbd8d commit 0cf7ecc
Show file tree
Hide file tree
Showing 14 changed files with 728 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ COPY runTestBenchmarks.sh /root/
COPY humanReadableLogs.py /root/
COPY catIndices.sh /root/
COPY showFetchMigrationCommand.sh /root/
COPY runOSIHistoricalDataMigration.sh /root/
COPY msk-iam-auth.properties /root/kafka-tools/aws
COPY kafkaCmdRef.md /root/kafka-tools
RUN chmod ug+x /root/runTestBenchmarks.sh
RUN chmod ug+x /root/humanReadableLogs.py
RUN chmod ug+x /root/catIndices.sh
RUN chmod ug+x /root/showFetchMigrationCommand.sh
RUN chmod ug+x /root/runOSIHistoricalDataMigration.sh
COPY python/ /root/python/
# TODO - Installing here messes with OSB's botocore
#WORKDIR /root/python
#RUN pip3 install --user -r requirements.txt
WORKDIR /root/kafka-tools
# Get kafka distribution and unpack to 'kafka'
RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

from typing import Union

from requests_aws4auth import AWS4Auth


# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster
class EndpointInfo:
# Private member variables
__url: str
# "|" operator is only supported in 3.10+
__auth: Union[AWS4Auth, tuple, None]
__verify_ssl: bool

def __init__(self, url: str, auth: Union[AWS4Auth, tuple, None] = None, verify_ssl: bool = True):
self.__url = url
# Normalize url value to have trailing slash
if not url.endswith("/"):
self.__url += "/"
self.__auth = auth
self.__verify_ssl = verify_ssl

def __eq__(self, obj):
return isinstance(obj, EndpointInfo) and \
self.__url == obj.__url and \
self.__auth == obj.__auth and \
self.__verify_ssl == obj.__verify_ssl

def add_path(self, path: str) -> str:
# Remove leading slash if present
if path.startswith("/"):
path = path[1:]
return self.__url + path

def get_url(self) -> str:
return self.__url

def get_auth(self) -> Union[AWS4Auth, tuple, None]:
return self.__auth

def is_verify_ssl(self) -> bool:
return self.__verify_ssl
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

import re
from typing import Optional, Union

from requests_aws4auth import AWS4Auth
from botocore.session import Session

from endpoint_info import EndpointInfo

# Constants
SOURCE_KEY = "source"
SINK_KEY = "sink"
SUPPORTED_PLUGINS = ["opensearch", "elasticsearch"]
HOSTS_KEY = "hosts"
INSECURE_KEY = "insecure"
CONNECTION_KEY = "connection"
DISABLE_AUTH_KEY = "disable_authentication"
USER_KEY = "username"
PWD_KEY = "password"
AWS_SIGV4_KEY = "aws_sigv4"
AWS_REGION_KEY = "aws_region"
AWS_CONFIG_KEY = "aws"
AWS_CONFIG_REGION_KEY = "region"
IS_SERVERLESS_KEY = "serverless"
ES_SERVICE_NAME = "es"
AOSS_SERVICE_NAME = "aoss"
URL_REGION_PATTERN = re.compile(r"([\w-]*)\.(es|aoss)\.amazonaws\.com")


def __get_url(plugin_config: dict) -> str:
# "hosts" can be a simple string, or an array of hosts for Logstash to hit.
# This tool needs one accessible host, so we pick the first entry in the latter case.
return plugin_config[HOSTS_KEY][0] if isinstance(plugin_config[HOSTS_KEY], list) else plugin_config[HOSTS_KEY]


# Helper function that attempts to extract the AWS region from a URL,
# assuming it is of the form *.<region>.<service>.amazonaws.com
def __derive_aws_region_from_url(url: str) -> Optional[str]:
match = URL_REGION_PATTERN.search(url)
if match:
# Index 0 returns the entire match, index 1 returns only the first group
return match.group(1)
return None


def get_aws_region(plugin_config: dict) -> str:
if plugin_config.get(AWS_SIGV4_KEY, False) and plugin_config.get(AWS_REGION_KEY, None) is not None:
return plugin_config[AWS_REGION_KEY]
elif plugin_config.get(AWS_CONFIG_KEY, None) is not None:
aws_config = plugin_config[AWS_CONFIG_KEY]
if not isinstance(aws_config, dict):
raise ValueError("Unexpected value for 'aws' configuration")
elif aws_config.get(AWS_CONFIG_REGION_KEY, None) is not None:
return aws_config[AWS_CONFIG_REGION_KEY]
# Region not explicitly defined, attempt to derive from URL
derived_region = __derive_aws_region_from_url(__get_url(plugin_config))
if derived_region is None:
raise ValueError("No region configured for AWS SigV4 auth, or derivable from host URL")
return derived_region


def __check_supported_endpoint(section_config: dict) -> Optional[tuple]:
for supported_type in SUPPORTED_PLUGINS:
if supported_type in section_config:
return supported_type, section_config[supported_type]


# This config key may be either directly in the main dict (for sink)
# or inside a nested dict (for source). The default value is False.
def is_insecure(plugin_config: dict) -> bool:
if INSECURE_KEY in plugin_config:
return plugin_config[INSECURE_KEY]
elif CONNECTION_KEY in plugin_config and INSECURE_KEY in plugin_config[CONNECTION_KEY]:
return plugin_config[CONNECTION_KEY][INSECURE_KEY]
return False


def validate_pipeline(pipeline: dict):
if SOURCE_KEY not in pipeline:
raise ValueError("Missing source configuration in Data Prepper pipeline YAML")
if SINK_KEY not in pipeline:
raise ValueError("Missing sink configuration in Data Prepper pipeline YAML")


def validate_auth(plugin_name: str, plugin_config: dict):
# If auth is disabled, no further validation is required
if plugin_config.get(DISABLE_AUTH_KEY, False):
return
# If AWS SigV4 is configured, validate region
if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config:
# Raises a ValueError if region cannot be derived
get_aws_region(plugin_config)
# Validate basic auth
elif USER_KEY not in plugin_config:
raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name)
elif PWD_KEY not in plugin_config:
raise ValueError("Invalid auth configuration (no password for username) for plugin: " + plugin_name)


def get_supported_endpoint_config(pipeline_config: dict, section_key: str) -> tuple:
# The value of each key may be a single plugin (as a dict) or a list of plugin configs
supported_tuple = tuple()
if isinstance(pipeline_config[section_key], dict):
supported_tuple = __check_supported_endpoint(pipeline_config[section_key])
elif isinstance(pipeline_config[section_key], list):
for entry in pipeline_config[section_key]:
supported_tuple = __check_supported_endpoint(entry)
# Break out of the loop at the first supported type
if supported_tuple:
break
if not supported_tuple:
raise ValueError("Could not find any supported endpoints in section: " + section_key)
# First tuple value is the plugin name, second value is the plugin config dict
return supported_tuple[0], supported_tuple[1]


def get_aws_sigv4_auth(region: str, is_serverless: bool = False) -> AWS4Auth:
credentials = Session().get_credentials()
if not credentials:
raise ValueError("Unable to fetch AWS session credentials for SigV4 auth")
if is_serverless:
return AWS4Auth(region=region, service=AOSS_SERVICE_NAME, refreshable_credentials=credentials)
else:
return AWS4Auth(region=region, service=ES_SERVICE_NAME, refreshable_credentials=credentials)


def get_auth(plugin_config: dict) -> Union[AWS4Auth, tuple, None]:
# Basic auth
if USER_KEY in plugin_config and PWD_KEY in plugin_config:
return plugin_config[USER_KEY], plugin_config[PWD_KEY]
elif plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config:
is_serverless = False
# OpenSearch Serverless uses a different service name
if AWS_CONFIG_KEY in plugin_config:
aws_config = plugin_config[AWS_CONFIG_KEY]
if isinstance(aws_config, dict) and aws_config.get(IS_SERVERLESS_KEY, False):
is_serverless = True
region = get_aws_region(plugin_config)
return get_aws_sigv4_auth(region, is_serverless)
return None


def get_endpoint_info_from_plugin_config(plugin_config: dict) -> EndpointInfo:
# verify boolean will be the inverse of the insecure SSL key, if present
should_verify = not is_insecure(plugin_config)
return EndpointInfo(__get_url(plugin_config), get_auth(plugin_config), should_verify)


def get_endpoint_info_from_pipeline_config(pipeline_config: dict, section_key: str) -> EndpointInfo:
# Raises a ValueError if no supported endpoints are found
plugin_name, plugin_config = get_supported_endpoint_config(pipeline_config, section_key)
if HOSTS_KEY not in plugin_config:
raise ValueError("No hosts defined for plugin: " + plugin_name)
# Raises a ValueError if there an error in the auth configuration
validate_auth(plugin_name, plugin_config)
return get_endpoint_info_from_plugin_config(plugin_config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

import utils
from index_operations import SETTINGS_KEY, MAPPINGS_KEY


# Computes and captures differences in indices between a "source" cluster
# and a "target" cluster. Indices that exist on the source cluster but not
# on the target cluster are considered "to-create". "Conflicting" indices
# are present on both source and target clusters, but differ in their index
# settings or mappings.
class IndexDiff:
indices_to_create: set
identical_indices: set
identical_empty_indices: set
conflicting_indices: set

def __init__(self, source: dict, target: dict):
self.identical_empty_indices = set()
self.conflicting_indices = set()
# Compute index names that are present in both the source and target
indices_intersection = set(source.keys()) & set(target.keys())
# Check if these "common" indices are identical or have metadata conflicts
for index in indices_intersection:
# Check settings
if utils.has_differences(SETTINGS_KEY, source[index], target[index]):
self.conflicting_indices.add(index)
# Check mappings
if utils.has_differences(MAPPINGS_KEY, source[index], target[index]):
self.conflicting_indices.add(index)
# Identical indices are the subset that do not have metadata conflicts
self.identical_indices = set(indices_intersection) - set(self.conflicting_indices)
# Indices that are not already on the target need to be created
self.indices_to_create = set(source.keys()) - set(indices_intersection)

def set_identical_empty_indices(self, indices: set):
self.identical_empty_indices = indices
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

from dataclasses import dataclass


# Captures the doc_count for indices in a cluster, and also computes a total
@dataclass
class IndexDocCount:
total: int
index_doc_count_map: dict
Loading

0 comments on commit 0cf7ecc

Please sign in to comment.