From 722a9dc89922391baafef55e27c2f4f3ad2c164e Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Sun, 12 Feb 2023 12:33:26 -0500 Subject: [PATCH] Working? --- contrib/storage-druid/.gitignore | 2 + contrib/storage-druid/pom.xml | 7 --- .../store/druid/DruidBatchRecordReader.java | 46 +++++++++++++------ .../exec/store/druid/DruidOffsetTracker.java | 5 ++ .../druid/DruidStoragePluginConfigTest.java | 16 ++++--- .../exec/store/druid/TestDruidQueries.java | 46 +++++++++++++++++-- 6 files changed, 89 insertions(+), 33 deletions(-) create mode 100644 contrib/storage-druid/.gitignore diff --git a/contrib/storage-druid/.gitignore b/contrib/storage-druid/.gitignore new file mode 100644 index 00000000000..9341ff44dc5 --- /dev/null +++ b/contrib/storage-druid/.gitignore @@ -0,0 +1,2 @@ +# Directory to store oauth tokens for testing Googlesheets Storage plugin +/src/test/resources/logback-test.xml diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml index deb1b0a99a2..4ded7e3c0d1 100755 --- a/contrib/storage-druid/pom.xml +++ b/contrib/storage-druid/pom.xml @@ -53,13 +53,6 @@ ${project.version} test - - org.assertj - assertj-core - - 3.11.1 - test - diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index b5811203d30..d9068954f62 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.store.druid.rest.DruidQueryClient; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; -import org.apache.drill.exec.vector.BaseValueVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +44,7 @@ public class DruidBatchRecordReader implements ManagedReader { private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class); + private static final int BATCH_SIZE = 4096; private static final ObjectMapper objectMapper = new ObjectMapper(); private final DruidStoragePlugin plugin; private final DruidSubScan.DruidSubScanSpec scanSpec; @@ -55,6 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader { private int maxRecordsToRead = -1; private JsonLoaderBuilder jsonBuilder; private JsonLoaderImpl jsonLoader; + private SchemaNegotiator negotiator; private ResultSetLoader resultSetLoader; private CustomErrorContext errorContext; @@ -75,34 +76,50 @@ public DruidBatchRecordReader(DruidSubScan subScan, @Override public boolean open(SchemaNegotiator negotiator) { - resultSetLoader = negotiator.build(); - errorContext = negotiator.parentErrorContext(); - negotiator.setErrorContext(errorContext); + this.negotiator = negotiator; + this.errorContext = this.negotiator.parentErrorContext(); + this.negotiator.batchSize(BATCH_SIZE); + this.negotiator.setErrorContext(errorContext); + + resultSetLoader = this.negotiator.build(); - jsonBuilder = new JsonLoaderBuilder() - .resultSetLoader(resultSetLoader) - .standardOptions(negotiator.queryOptions()) - .errorContext(errorContext); return true; } @Override public boolean next() { + jsonBuilder = new JsonLoaderBuilder() + .resultSetLoader(resultSetLoader) + .standardOptions(negotiator.queryOptions()) + .errorContext(errorContext); + int eventCounter = 0; boolean result = false; try { String query = getQuery(); + logger.debug("Executing query: {}", query); DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); setNextOffset(druidScanResponse); + StringBuilder events = new StringBuilder(); for (ObjectNode eventNode : druidScanResponse.getEvents()) { - jsonLoader = (JsonLoaderImpl) jsonBuilder - .fromString(eventNode.toString()) + events.append(eventNode); + events.append("\n"); + eventCounter++; + } + + + jsonLoader = (JsonLoaderImpl) jsonBuilder + .fromString(events.toString()) .build(); - result = jsonLoader.readBatch(); + result = jsonLoader.readBatch(); + + if (eventCounter < BATCH_SIZE) { + return false; + } else { + return result; } - return result; } catch (Exception e) { throw UserException .dataReadError(e) @@ -123,8 +140,8 @@ public void close() { private String getQuery() throws JsonProcessingException { int queryThreshold = maxRecordsToRead >= 0 - ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead) - : BaseValueVector.INITIAL_VALUE_ALLOCATION; + ? Math.min(BATCH_SIZE, maxRecordsToRead) + : BATCH_SIZE; ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); ScanQuery scanQuery = scanQueryBuilder.build( @@ -140,7 +157,6 @@ private String getQuery() throws JsonProcessingException { } private void setNextOffset(DruidScanResponse druidScanResponse) { - //nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size())); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java index da15b309cdc..16604f0b494 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java @@ -17,9 +17,13 @@ */ package org.apache.drill.exec.store.druid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigInteger; public class DruidOffsetTracker { + private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class); private BigInteger nextOffset; public DruidOffsetTracker() { @@ -32,5 +36,6 @@ public BigInteger getOffset() { public void setNextOffset(BigInteger offset) { nextOffset = nextOffset.add(offset); + logger.debug("Incrementing offset by {}", offset); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java index 354b23c5e5d..c84e5e05321 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java @@ -27,7 +27,9 @@ import java.io.IOException; import java.net.URISyntaxException; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class DruidStoragePluginConfigTest { @@ -39,11 +41,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed() Resources.getResource("bootstrap-storage-plugins.json").toURI())); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig).isNotNull(); - assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082"); - assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081"); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200); - assertThat(druidStoragePluginConfig.isEnabled()).isFalse(); + assertNotNull(druidStoragePluginConfig); + assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress()); + assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress()); + assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes()); + assertFalse(druidStoragePluginConfig.isEnabled()); } @Test @@ -58,6 +60,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig() JsonNode storagePluginJson = mapper.readTree(druidConfigStr); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100); + assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes()); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java index 1b0d9361eae..c4a3a43b578 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java @@ -20,10 +20,19 @@ import org.apache.drill.categories.DruidStorageTest; import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; + + @Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests") @Category({SlowTest.class, DruidStorageTest.class}) public class TestDruidQueries extends DruidTestBase { @@ -33,7 +42,7 @@ public void testStarQuery() throws Exception { testBuilder() .sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA)) .unOrdered() - .expectsNumRecords(2) + .expectsNumRecords(876) .go(); } @@ -60,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception { testBuilder() .sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA)) .unOrdered() - .expectsNumRecords(3) + .expectsNumRecords(1) .go(); } @@ -72,7 +81,7 @@ public void testSingleColumnProject() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("comment") - .expectsNumRecords(24433) + .expectsNumRecords(876) .go(); } @@ -84,7 +93,36 @@ public void testCountAllRowsQuery() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("mycount") - .baselineValues(24433L) + .baselineValues(876L) .go(); } + + @Test + public void testGroupByQuery() throws Exception { + String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA); + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL) + .add("user_count", MinorType.BIGINT) + .buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema) + .addRow("Main", 702) + .addRow("User talk", 29) + .addRow("Wikipedia", 26) + .addRow("Talk", 17) + .addRow("User", 12) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSerDe() throws Exception { + String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA); + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match", 876L, cnt); + } }