diff --git a/.github/workflows/opensearch-sink-opendistro-integration-tests.yml b/.github/workflows/opensearch-sink-opendistro-integration-tests.yml index 210f6b4539..6f4ac3d457 100644 --- a/.github/workflows/opensearch-sink-opendistro-integration-tests.yml +++ b/.github/workflows/opensearch-sink-opendistro-integration-tests.yml @@ -20,7 +20,7 @@ jobs: strategy: matrix: java: [11] - opendistro: [1.3.0, 1.6.0, 1.8.0, 1.9.0, 1.11.0, 1.12.0, 1.13.3] + opendistro: [0.10.0, 1.3.0, 1.6.0, 1.8.0, 1.9.0, 1.11.0, 1.12.0, 1.13.3] fail-fast: false runs-on: ubuntu-latest diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java index ff79a0f555..8f62098746 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java @@ -14,6 +14,7 @@ class DeclaredOpenSearchVersion implements Comparable { private static final DeclaredOpenSearchVersion DEFAULT = new DeclaredOpenSearchVersion(Distribution.OPENSEARCH, "1.0.0"); public static final DeclaredOpenSearchVersion OPENDISTRO_1_9 = new DeclaredOpenSearchVersion(Distribution.OPENDISTRO, "1.9.0"); + public static final DeclaredOpenSearchVersion OPENDISTRO_0_10 = new DeclaredOpenSearchVersion(Distribution.OPENDISTRO, "0.10.0"); enum Distribution { OPENDISTRO, diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java index 820c39b820..073afa168e 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java @@ -44,12 +44,13 @@ void parse_with_null_should_return_minimum_version() { @CsvSource({ "opensearch:1.2.4,opensearch:1.2.4,0", "opendistro:1.13.3,opendistro:1.13.3,0", + "opendistro:0.10.0,opendistro:1.0.0,-1", "opensearch:1.2.4,opensearch:1.2.3,1", "opensearch:1.2.3,opensearch:1.2.4,-1", "opensearch:2.6.0,opensearch:1.2.3,1", "opensearch:1.2.3,opensearch:2.6.0,-1", "opensearch:1.3.9,opendistro:1.13.3,1", - "opendistro:1.13.3,opensearch:1.3.9,-1" + "opendistro:1.13.3,opensearch:1.3.9,-1", }) void compareTo_returns_correct_value(final String versionStringToTest, final String otherVersionString, final int expectedCompareTo) { final DeclaredOpenSearchVersion objectUnderTest = DeclaredOpenSearchVersion.parse(versionStringToTest); diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index de150c820b..84aa8b174e 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledIf; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -109,6 +110,8 @@ public class OpenSearchSinkIT { private static final String DEFAULT_RAW_SPAN_FILE_1 = "raw-span-1.json"; private static final String DEFAULT_RAW_SPAN_FILE_2 = "raw-span-2.json"; private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json"; + private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false"; + private static final String TRACE_INGESTION_TEST_DISABLED_REASON = "Trace ingestion is not supported for ES 6"; private RestClient client; private EventHandle eventHandle; @@ -169,6 +172,7 @@ public void cleanOpenSearch() throws Exception { } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkRawSpanDefault() throws IOException { final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); @@ -212,6 +216,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException { final String reservedIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final Request request = new Request(HttpMethod.PUT, reservedIndexAlias); @@ -222,6 +227,7 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I RuntimeException.class, () -> sink.doInitialize()); } + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) @ParameterizedTest @CsvSource({"true,true", "true,false", "false,true", "false,false"}) public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression, @@ -290,6 +296,7 @@ public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompress MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) @ParameterizedTest @CsvSource({"true,true", "true,false", "false,true", "false,false"}) public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression, @@ -352,6 +359,7 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkServiceMapDefault() throws IOException { final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); @@ -422,7 +430,9 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - final Request request = new Request(HttpMethod.HEAD, testIndexAlias); + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + final Request request = new Request(HttpMethod.HEAD, testIndexAlias + extraURI); final Response response = client.performRequest(request); MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); sink.shutdown(); @@ -433,6 +443,7 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { final String indexAlias = "sink-custom-index-ism-test-alias"; final String testTemplateFile = Objects.requireNonNull( @@ -441,7 +452,9 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE); final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - Request request = new Request(HttpMethod.HEAD, indexAlias); + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + Request request = new Request(HttpMethod.HEAD, indexAlias + extraURI); Response response = client.performRequest(request); MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); final String index = String.format("%s-000001", indexAlias); @@ -497,7 +510,10 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - Request getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + Request getTemplateRequest = new Request(HttpMethod.GET, + "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); Response getTemplateResponse = client.performRequest(getTemplateRequest); MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); @@ -513,7 +529,8 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV2); sink = createObjectUnderTest(pluginSetting, true); - getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + getTemplateRequest = new Request(HttpMethod.GET, + "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); getTemplateResponse = client.performRequest(getTemplateRequest); MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); @@ -529,7 +546,8 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); sink = createObjectUnderTest(pluginSetting, true); - getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + getTemplateRequest = new Request(HttpMethod.GET, + "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); getTemplateResponse = client.performRequest(getTemplateRequest); MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); @@ -623,6 +641,7 @@ public void testBulkActionCreate() throws IOException, InterruptedException { } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testEventOutputWithTags() throws IOException, InterruptedException { final Event testEvent = JacksonEvent.builder() .withData("{\"log\": \"foobar\"}") @@ -651,6 +670,7 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { } @Test + @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testEventOutput() throws IOException, InterruptedException { final Event testEvent = JacksonEvent.builder() @@ -838,6 +858,8 @@ public void testOpenSearchIndexWithInvalidChars() throws IOException, Interrupte @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) + @DisabledIf(value = "isES6", + disabledReason = "PUT _opendistro/_security/api/roles/ request could not be parsed in ES 6.") public void testOutputManagementDisabled() throws IOException, InterruptedException { final String testIndexAlias = "test-" + UUID.randomUUID(); final String roleName = UUID.randomUUID().toString(); @@ -894,6 +916,10 @@ private Map initializeConfigurationMetadata(final String indexTy metadata.put(ConnectionConfiguration.USERNAME, user); metadata.put(ConnectionConfiguration.PASSWORD, password); } + final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + DistributionVersion.ES6.getVersion() : DistributionVersion.DEFAULT.getVersion(); + metadata.put(IndexConfiguration.DISTRIBUTION_VERSION, distributionVersion); return metadata; } @@ -1021,7 +1047,9 @@ private List> getSearchResponseDocSources(final String index } private Map getIndexMappings(final String index) throws IOException { - final Request request = new Request(HttpMethod.GET, index + "/_mappings"); + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + final Request request = new Request(HttpMethod.GET, index + "/_mappings" + extraURI); final Response response = client.performRequest(request); final String responseBody = EntityUtils.toString(response.getEntity()); @@ -1045,7 +1073,10 @@ private String getIndexPolicyId(final String index) throws IOException { @SuppressWarnings("unchecked") private void wipeAllOpenSearchIndices() throws IOException { - final Response response = client.performRequest(new Request("GET", "/*?expand_wildcards=all")); + final String getIndicesEndpoint = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + "/*?expand_wildcards=all&include_type_name=false" : "/*?expand_wildcards=all"; + final Response response = client.performRequest(new Request("GET", getIndicesEndpoint)); final String responseBody = EntityUtils.toString(response.getEntity()); final Map indexContent = createContentParser(XContentType.JSON.xContent(), responseBody).map(); @@ -1098,4 +1129,8 @@ private void createV1IndexTemplate(final String templateName, final String index request.setJsonEntity(createTemplateJson); final Response response = client.performRequest(request); } + + private static boolean isES6() { + return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0; + } }