Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Fetch Migration] Added constructs and logic for component and index …
Browse files Browse the repository at this point in the history
…template migration

This includes class representations of component and index template information (and their unit tests). Index_operations now also has new functions to fetch and create component/index templates - unit test coverage for this is TBD. Finally, the template migration logic has been added to metadata_migration

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Dec 13, 2023
1 parent 859362a commit 5562931
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 4 deletions.
34 changes: 34 additions & 0 deletions FetchMigration/python/component_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#


# Constants
from typing import Optional

NAME_KEY = "name"
DEFAULT_TEMPLATE_KEY = "component_template"


# Class that encapsulates component template information
class ComponentTemplateInfo:
# Private member variables
__name: str
__template_def: Optional[dict]

def __init__(self, template_payload: dict, template_key: str = DEFAULT_TEMPLATE_KEY):
self.__name = template_payload[NAME_KEY]
self.__template_def = None
if template_key in template_payload:
self.__template_def = template_payload[template_key]

def get_name(self) -> str:
return self.__name

def get_template_definition(self) -> dict:
return self.__template_def
57 changes: 57 additions & 0 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
import jsonpath_ng
import requests

from component_template_info import ComponentTemplateInfo
from endpoint_info import EndpointInfo
from index_doc_count import IndexDocCount
from index_template_info import IndexTemplateInfo

# Constants
SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
ALIASES_KEY = "aliases"
COUNT_KEY = "count"
__INDEX_KEY = "index"
__COMPONENT_TEMPLATE_LIST_KEY = "component_templates"
__INDEX_TEMPLATE_LIST_KEY = "index_templates"
__INDEX_TEMPLATES_PATH = "/_index_template"
__COMPONENT_TEMPLATES_PATH = "/_component_template"
__ALL_INDICES_ENDPOINT = "*"
__SEARCH_COUNT_PATH = "/_search?size=0"
__SEARCH_COUNT_PAYLOAD = {"aggs": {"count": {"terms": {"field": "_index"}}}}
Expand Down Expand Up @@ -104,3 +110,54 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
return IndexDocCount(total, count_map)
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch doc_count: {e!s}")


def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) -> set:
url: str = endpoint.add_path(path)
# raises RuntimeError in case of any request errors
resp = __send_get_request(url, endpoint)
result = set()
if root_key in resp.json():
for template in resp.json()[root_key]:
result.add(factory(template))
return result


def fetch_all_component_templates(endpoint: EndpointInfo) -> set[ComponentTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __COMPONENT_TEMPLATES_PATH, __COMPONENT_TEMPLATE_LIST_KEY,
lambda t: ComponentTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch component template metadata from cluster endpoint: {e!s}")


def fetch_all_index_templates(endpoint: EndpointInfo) -> set[IndexTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __INDEX_TEMPLATES_PATH, __INDEX_TEMPLATE_LIST_KEY,
lambda t: IndexTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch index template metadata from cluster endpoint: {e!s}")


def __create_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo, template_path: str) -> dict:
failures = dict()
for template in templates:
template_endpoint = endpoint.add_path(template_path + "/" + template.get_name())
try:
resp = requests.put(template_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=template.get_template_definition(), timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
failures[template.get_name()] = e
# Loop completed, return failures if any
return failures


def create_component_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __COMPONENT_TEMPLATES_PATH)


def create_index_templates(templates: set[IndexTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __INDEX_TEMPLATES_PATH)
23 changes: 23 additions & 0 deletions FetchMigration/python/index_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

from component_template_info import ComponentTemplateInfo

# Constants
INDEX_TEMPLATE_KEY = "index_template"


# Class that encapsulates index template information from a cluster.
# Subclass of ComponentTemplateInfo because the structure of an index
# template is identical to a component template, except that it uses
# a different template key. Also, index templates can be "composed" of
# one or more component templates.
class IndexTemplateInfo(ComponentTemplateInfo):
def __init__(self, template_payload: dict):
super().__init__(template_payload, INDEX_TEMPLATE_KEY)
35 changes: 34 additions & 1 deletion FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,35 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo,
return result


# Returns true if there were failures, false otherwise
def __log_template_failures(failures: dict, target_count: int) -> bool:
fail_count = len(failures)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {target_count} templates")
for failed_template_name, error in failures.items():
logging.error(f"Template name {failed_template_name} failed: {error!s}")
# Return true to signal failures
return True
else:
# No failures, return false
return False


# Raises RuntimeError if component/index template migration fails
def template_migration(source: EndpointInfo, target: EndpointInfo):
# Fetch and migrate component templates first
templates = index_operations.fetch_all_component_templates(source)
failures = index_operations.create_component_templates(templates, target)
if not __log_template_failures(failures, len(templates)):
# Only migrate index templates if component template migration had no failures
templates = index_operations.fetch_all_index_templates(source)
failures = index_operations.create_index_templates(templates, target)
if __log_template_failures(failures, len(templates)):
raise RuntimeError("Failed to create some index templates")
else:
raise RuntimeError("Failed to create some component templates, aborting index template creation")


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
Expand All @@ -112,6 +141,10 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
if result.migration_indices and len(args.output_file) > 0:
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
# Create component and index templates, may raise RuntimeError
template_migration(source_endpoint_info, target_endpoint_info)
# Finally return result
return result


Expand Down Expand Up @@ -143,6 +176,6 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
arg_parser.add_argument("--report", "-r", action="store_true",
help="Print a report of the index differences")
arg_parser.add_argument("--dryrun", action="store_true",
help="Skips the actual creation of indices on the target cluster")
help="Skips the actual creation of metadata on the target cluster")
namespace = arg_parser.parse_args()
run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
91 changes: 91 additions & 0 deletions FetchMigration/python/tests/test_index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@
from responses import matchers

import index_operations
from component_template_info import ComponentTemplateInfo
from endpoint_info import EndpointInfo
from index_template_info import IndexTemplateInfo
from tests import test_constants


# Helper method to create a template API response
def create_base_template_response(list_name: str, body: dict) -> dict:
return {list_name: [{"name": "test", list_name[:-1]: {"template": {
test_constants.SETTINGS_KEY: body.get(test_constants.SETTINGS_KEY, {}),
test_constants.MAPPINGS_KEY: body.get(test_constants.MAPPINGS_KEY, {})
}}}]}


class TestIndexOperations(unittest.TestCase):
@responses.activate
def test_fetch_all_indices(self):
Expand Down Expand Up @@ -126,6 +136,87 @@ def test_get_request_errors(self):
self.assertRaises(RuntimeError, index_operations.fetch_all_indices,
EndpointInfo(test_constants.SOURCE_ENDPOINT))

@responses.activate
def test_fetch_all_component_templates_empty(self):
# 1 - Empty response
responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={})
result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
# Missing key returns empty result
self.assertEqual(0, len(result))
# 2 - Valid response structure but no templates
responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={"component_templates": []})
result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(0, len(result))
# 2 - Invalid response structure
responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={"templates": []})
result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(0, len(result))

@responses.activate
def test_fetch_all_component_templates(self):
# Set up response
test_index = test_constants.BASE_INDICES_DATA[test_constants.INDEX3_NAME]
test_resp = create_base_template_response("component_templates", test_index)
responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json=test_resp)
result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
# Result should contain one template
self.assertEqual(1, len(result))
template = result.pop()
self.assertTrue(isinstance(template, ComponentTemplateInfo))
self.assertEqual("test", template.get_name())
template_def = template.get_template_definition()["template"]
self.assertEqual(test_index[test_constants.SETTINGS_KEY], template_def[test_constants.SETTINGS_KEY])
self.assertEqual(test_index[test_constants.MAPPINGS_KEY], template_def[test_constants.MAPPINGS_KEY])

@responses.activate
def test_fetch_all_index_templates_empty(self):
# 1 - Empty response
responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={})
result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
# Missing key returns empty result
self.assertEqual(0, len(result))
# 2 - Valid response structure but no templates
responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={"index_templates": []})
result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(0, len(result))
# 2 - Invalid response structure
responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={"templates": []})
result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(0, len(result))

@responses.activate
def test_fetch_all_index_templates(self):
# Set up base response
key = "index_templates"
test_index_pattern = "test-*"
test_component_template_name = "test_component_template"
test_index = test_constants.BASE_INDICES_DATA[test_constants.INDEX2_NAME]
test_resp = create_base_template_response(key, test_index)
# Add fields specific to index templates
template_body = test_resp[key][0][key[:-1]]
template_body["index_patterns"] = [test_index_pattern]
template_body["composed_of"] = [test_component_template_name]
responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json=test_resp)
result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT))
# Result should contain one template
self.assertEqual(1, len(result))
template = result.pop()
self.assertTrue(isinstance(template, IndexTemplateInfo))
self.assertEqual("test", template.get_name())
template_def = template.get_template_definition()["template"]
self.assertEqual(test_index[test_constants.SETTINGS_KEY], template_def[test_constants.SETTINGS_KEY])
self.assertEqual(test_index[test_constants.MAPPINGS_KEY], template_def[test_constants.MAPPINGS_KEY])

@responses.activate
def test_fetch_all_templates_errors(self):
# Set up error responses
responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", body=requests.Timeout())
responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", body=requests.HTTPError())
self.assertRaises(RuntimeError, index_operations.fetch_all_component_templates,
EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertRaises(RuntimeError, index_operations.fetch_all_index_templates,
EndpointInfo(test_constants.SOURCE_ENDPOINT))


if __name__ == '__main__':
unittest.main()
15 changes: 12 additions & 3 deletions FetchMigration/python/tests/test_metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ def setUp(self) -> None:
def tearDown(self) -> None:
logging.disable(logging.NOTSET)

@patch('metadata_migration.template_migration')
@patch('index_operations.doc_count')
@patch('metadata_migration.write_output')
@patch('metadata_migration.print_report')
@patch('index_operations.create_indices')
@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock,
mock_print_report: MagicMock, mock_write_output: MagicMock, mock_doc_count: MagicMock):
mock_print_report: MagicMock, mock_write_output: MagicMock, mock_doc_count: MagicMock,
mock_template_migration: MagicMock):
mock_doc_count.return_value = IndexDocCount(1, dict())
index_to_create = test_constants.INDEX3_NAME
index_with_conflict = test_constants.INDEX2_NAME
Expand All @@ -58,6 +60,7 @@ def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: Ma
mock_doc_count.assert_called()
mock_print_report.assert_called_once_with(ANY, 1)
mock_write_output.assert_not_called()
mock_template_migration.assert_called_once()

@patch('index_operations.doc_count')
@patch('metadata_migration.print_report')
Expand Down Expand Up @@ -142,23 +145,28 @@ def test_missing_output_file_non_report(self):
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH)
self.assertRaises(ValueError, metadata_migration.run, test_input)

@patch('metadata_migration.template_migration')
@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_no_indices_in_source(self, mock_fetch_indices: MagicMock):
def test_no_indices_in_source(self, mock_fetch_indices: MagicMock, mock_template_migration: MagicMock):
mock_fetch_indices.return_value = {}
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy")
test_result = metadata_migration.run(test_input)
mock_fetch_indices.assert_called_once()
self.assertEqual(0, test_result.target_doc_count)
self.assertEqual(0, len(test_result.migration_indices))
# Templates are still migrated
mock_template_migration.assert_called_once()

@patch('metadata_migration.write_output')
@patch('metadata_migration.template_migration')
@patch('index_operations.doc_count')
@patch('index_operations.create_indices')
@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_failed_indices(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock,
mock_doc_count: MagicMock, mock_write_output: MagicMock):
mock_doc_count: MagicMock, mock_template_migration: MagicMock,
mock_write_output: MagicMock):
mock_doc_count.return_value = IndexDocCount(1, dict())
# Setup failed indices
test_failed_indices_result = {
Expand All @@ -172,6 +180,7 @@ def test_failed_indices(self, mock_fetch_indices: MagicMock, mock_create_indices
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy")
self.assertRaises(RuntimeError, metadata_migration.run, test_input)
mock_create_indices.assert_called_once_with(test_constants.BASE_INDICES_DATA, ANY)
mock_template_migration.assert_not_called()


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 5562931

Please sign in to comment.