Skip to content

Commit

Permalink
Allow flexible DateTime pattern location in index name (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#4000)

* Allow datetime pattern anywhere in index name

Signed-off-by: Hai Yan <[email protected]>

* Add integ tests

Signed-off-by: Hai Yan <[email protected]>

* Remove leading dash in indexPrefix

Signed-off-by: Hai Yan <[email protected]>

* Update index patterns

Signed-off-by: Hai Yan <[email protected]>

* Add integ tests for verifying index template creation

Signed-off-by: Hai Yan <[email protected]>

* Update readme

Signed-off-by: Hai Yan <[email protected]>

* Update unit tests to address review comments

Signed-off-by: Hai Yan <[email protected]>

* Use regex Pattern for replaceAll

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Jan 24, 2024
1 parent 084c739 commit bb132af
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 72 deletions.
3 changes: 2 additions & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ Default is null.
```
- <a name="index"></a>`index`: A String used as index name for custom data type. Applicable and required only If `index_type` is explicitly `custom` or defaults to be `custom`.
* This index name can be a plain string, such as `application`, `my-index-name`.
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a plain string with a date-time pattern, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`, `index-%{yyyy-MM-dd}-dev`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.
- Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
- `normalize_index` (optional): If true, the plugin will try to make dynamic index names (index names with format options specified in `${}`) valid according to [index naming restrictions](https://opensearch.org/docs/2.11/api-reference/index-apis/create-index/#index-naming-restrictions). Any invalid characters will be removed. Default value is false.
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. Note: when `distribution_version` is `es6`, `template_type` is enforced into `v1`.

- <a name="template_file"></a>`template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,57 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates(

}

@ParameterizedTest
@ArgumentsSource(CreateWithIndexTemplateArgumentsProvider.class)
public void testIndexNameWithDateNotAsSuffixCreatesIndexTemplate(
final String templateType,
final String templatePath,
final String templateFile,
final BiFunction<Map<String, Object>, String, Integer> extractVersionFunction,
final BiFunction<Map<String, Object>, String, Map<String, Object>> extractMappingsFunction) throws IOException {
final String testIndexAlias = "prefix-%{yyyy-MM}-suffix";
final String expectedDate = new SimpleDateFormat("yyyy-MM").format(new Date());
final String expectedIndexName = "prefix-" + expectedDate + "-suffix";
final String expectedIndexTemplateName = "prefix-suffix-index-template";
final String testTemplateFileV1 = getClass().getClassLoader().getResource(templateFile).getFile();

PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);

final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(
OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : "";
Request getTemplateRequest = new Request(HttpMethod.GET,
"/" + templatePath + "/" + expectedIndexTemplateName + extraURI);
Response getTemplateResponse = client.performRequest(getTemplateRequest);
assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK));

String responseBody = EntityUtils.toString(getTemplateResponse.getEntity());
@SuppressWarnings("unchecked") final Integer responseVersion =
extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(),
responseBody).map(), expectedIndexTemplateName);
@SuppressWarnings("unchecked") final Map<String, Object> templateMappings =
extractMappingsFunction.apply(createContentParser(XContentType.JSON.xContent(),
responseBody).map(), expectedIndexTemplateName);
assertThat(responseVersion, equalTo(Integer.valueOf(1)));
assertThat(templateMappings.isEmpty(), equalTo(false));

Request getIndexRequest = new Request(HttpMethod.GET,
"/" + expectedIndexName + extraURI);
Response getIndexResponse = client.performRequest(getIndexRequest);
assertThat(getIndexResponse.getStatusLine().getStatusCode(), equalTo(SC_OK));

String getIndexResponseBody = EntityUtils.toString(getIndexResponse.getEntity());
@SuppressWarnings("unchecked") final Map<String, Object> indexBlob = (Map<String, Object>) createContentParser(XContentType.JSON.xContent(),
getIndexResponseBody).map().get(expectedIndexName);
@SuppressWarnings("unchecked") final Map<String, Object> mappingsBlob = (Map<String, Object>) indexBlob.get("mappings");
assertThat(mappingsBlob.isEmpty(), equalTo(false));

// assert the mappings from index template are applied to the index
assertThat(mappingsBlob, equalTo(templateMappings));

sink.shutdown();
}

static class CreateWithTemplatesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
Expand All @@ -610,6 +661,35 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
}
}

static class CreateWithIndexTemplateArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
final List<Arguments> arguments = new ArrayList<>();
arguments.add(
arguments("v1", "_template",
TEST_TEMPLATE_V1_FILE,
(BiFunction<Map<String, Object>, String, Integer>) (map, templateName) ->
(Integer) ((Map<String, Object>) map.get(templateName)).get("version"),
(BiFunction<Map<String, Object>, String, Map<String, Object>>) (map, templateName) ->
(Map<String, Object>) ((Map<String, Object>) map.get(templateName)).get("mappings")
)
);

if (OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.OPENDISTRO_1_9) >= 0) {
arguments.add(
arguments("index-template", "_index_template",
TEST_INDEX_TEMPLATE_V1_FILE,
(BiFunction<Map<String, Object>, String, Integer>) (map, unused) ->
(Integer) ((List<Map<String, Map<String, Object>>>) map.get("index_templates")).get(0).get("index_template").get("version"),
(BiFunction<Map<String, Object>, String, Map<String, Object>>) (map, templateName) ->
(Map<String, Object>) ((List<Map<String, Map<String, Map<String, Object>>>>) map.get("index_templates")).get(0).get("index_template").get("template").get("mappings")
)
);
}
return arguments.stream();
}
}

static class CreateSingleWithTemplatesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
Expand Down Expand Up @@ -1268,6 +1348,42 @@ public void testOpenSearchDynamicIndexWithDate(final String testIndex, final Str
sink.shutdown();
}

@ParameterizedTest
@CsvSource({
"id, yyyy-MM-dd, %{yyyy-MM-dd}-test-${id}-index",
"id, yyyy-MM-dd, test-%{yyyy-MM-dd}-${id}-index",
"id, yyyy-MM-dd, test-${id}-%{yyyy-MM-dd}-index",
})
public void testOpenSearchDynamicIndexWithDateNotAsSuffix(
final String testIndex, final String testDatePattern, final String dynamicTestIndexAlias) throws IOException {
final String testIndexName = "idx1";
SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern);
Date date = new Date();
String expectedDate = formatter.format(date);
final String expectedIndexAlias = dynamicTestIndexAlias
.replace("%{yyyy-MM-dd}", expectedDate)
.replace("${id}", testIndexName);
final String data = UUID.randomUUID().toString();
final Map<String, Object> dataMap = Map.of("data", data);
final Event testEvent = JacksonEvent.builder()
.withData(dataMap)
.withEventType("event")
.build();
testEvent.put(testIndex, testIndexName);

Map<String, Object> expectedMap = testEvent.toMap();

final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expectedIndexAlias);
assertThat(retSources.size(), equalTo(1));
assertThat(retSources, hasItem(expectedMap));
sink.shutdown();
}

@ParameterizedTest
@ValueSource(strings = {"yyyy-MM", "yyyy-MM-dd", "dd-MM-yyyy"})
public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOException, InterruptedException {
Expand Down Expand Up @@ -1296,6 +1412,34 @@ public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOE
sink.shutdown();
}

@ParameterizedTest
@ValueSource(strings = {"test-%{yyyy-MM-dd}-index", "%{yyyy-MM-dd}-test-index"})
public void testOpenSearchIndexWithDateNotAsSuffix(final String testIndexAlias) throws IOException, InterruptedException {
final String testDatePattern = "yyyy-MM-dd";
SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern);
Date date = new Date();
String expectedIndexName = testIndexAlias.replace("%{yyyy-MM-dd}", formatter.format(date));

final String data = UUID.randomUUID().toString();
final Map<String, Object> dataMap = Map.of("data", data);
final Event testEvent = JacksonEvent.builder()
.withData(dataMap)
.withEventType("event")
.build();

Map<String, Object> expectedMap = testEvent.toMap();

final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expectedIndexName);
assertThat(retSources.size(), equalTo(1));
assertThat(retSources, hasItem(expectedMap));
sink.shutdown();
}

@Test
public void testOpenSearchIndexWithInvalidDate() throws IOException, InterruptedException {
String invalidDatePattern = "yyyy-MM-dd HH:ss:mm";
Expand Down
Loading

0 comments on commit bb132af

Please sign in to comment.