diff --git a/contrib/storage-druid/.gitignore b/contrib/storage-druid/.gitignore
new file mode 100644
index 00000000000..9341ff44dc5
--- /dev/null
+++ b/contrib/storage-druid/.gitignore
@@ -0,0 +1,2 @@
+# Directory to store oauth tokens for testing Googlesheets Storage plugin
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml
index deb1b0a99a2..4ded7e3c0d1 100755
--- a/contrib/storage-druid/pom.xml
+++ b/contrib/storage-druid/pom.xml
@@ -53,13 +53,6 @@
${project.version}
test
-
- org.assertj
- assertj-core
-
- 3.11.1
- test
-
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java
index b5811203d30..d9068954f62 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java
@@ -35,7 +35,6 @@
import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
-import org.apache.drill.exec.vector.BaseValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@
public class DruidBatchRecordReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
+ private static final int BATCH_SIZE = 4096;
private static final ObjectMapper objectMapper = new ObjectMapper();
private final DruidStoragePlugin plugin;
private final DruidSubScan.DruidSubScanSpec scanSpec;
@@ -55,6 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader {
private int maxRecordsToRead = -1;
private JsonLoaderBuilder jsonBuilder;
private JsonLoaderImpl jsonLoader;
+ private SchemaNegotiator negotiator;
private ResultSetLoader resultSetLoader;
private CustomErrorContext errorContext;
@@ -75,34 +76,50 @@ public DruidBatchRecordReader(DruidSubScan subScan,
@Override
public boolean open(SchemaNegotiator negotiator) {
- resultSetLoader = negotiator.build();
- errorContext = negotiator.parentErrorContext();
- negotiator.setErrorContext(errorContext);
+ this.negotiator = negotiator;
+ this.errorContext = this.negotiator.parentErrorContext();
+ this.negotiator.batchSize(BATCH_SIZE);
+ this.negotiator.setErrorContext(errorContext);
+
+ resultSetLoader = this.negotiator.build();
- jsonBuilder = new JsonLoaderBuilder()
- .resultSetLoader(resultSetLoader)
- .standardOptions(negotiator.queryOptions())
- .errorContext(errorContext);
return true;
}
@Override
public boolean next() {
+ jsonBuilder = new JsonLoaderBuilder()
+ .resultSetLoader(resultSetLoader)
+ .standardOptions(negotiator.queryOptions())
+ .errorContext(errorContext);
+ int eventCounter = 0;
boolean result = false;
try {
String query = getQuery();
+ logger.debug("Executing query: {}", query);
DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
setNextOffset(druidScanResponse);
+ StringBuilder events = new StringBuilder();
for (ObjectNode eventNode : druidScanResponse.getEvents()) {
- jsonLoader = (JsonLoaderImpl) jsonBuilder
- .fromString(eventNode.toString())
+ events.append(eventNode);
+ events.append("\n");
+ eventCounter++;
+ }
+
+
+ jsonLoader = (JsonLoaderImpl) jsonBuilder
+ .fromString(events.toString())
.build();
- result = jsonLoader.readBatch();
+ result = jsonLoader.readBatch();
+
+ if (eventCounter < BATCH_SIZE) {
+ return false;
+ } else {
+ return result;
}
- return result;
} catch (Exception e) {
throw UserException
.dataReadError(e)
@@ -123,8 +140,8 @@ public void close() {
private String getQuery() throws JsonProcessingException {
int queryThreshold =
maxRecordsToRead >= 0
- ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead)
- : BaseValueVector.INITIAL_VALUE_ALLOCATION;
+ ? Math.min(BATCH_SIZE, maxRecordsToRead)
+ : BATCH_SIZE;
ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
ScanQuery scanQuery =
scanQueryBuilder.build(
@@ -140,7 +157,6 @@ private String getQuery() throws JsonProcessingException {
}
private void setNextOffset(DruidScanResponse druidScanResponse) {
- //nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
}
}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java
index da15b309cdc..16604f0b494 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java
@@ -17,9 +17,13 @@
*/
package org.apache.drill.exec.store.druid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.math.BigInteger;
public class DruidOffsetTracker {
+ private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class);
private BigInteger nextOffset;
public DruidOffsetTracker() {
@@ -32,5 +36,6 @@ public BigInteger getOffset() {
public void setNextOffset(BigInteger offset) {
nextOffset = nextOffset.add(offset);
+ logger.debug("Incrementing offset by {}", offset);
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
index 354b23c5e5d..c84e5e05321 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
@@ -27,7 +27,9 @@
import java.io.IOException;
import java.net.URISyntaxException;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class DruidStoragePluginConfigTest {
@@ -39,11 +41,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed()
Resources.getResource("bootstrap-storage-plugins.json").toURI()));
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
- assertThat(druidStoragePluginConfig).isNotNull();
- assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082");
- assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081");
- assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200);
- assertThat(druidStoragePluginConfig.isEnabled()).isFalse();
+ assertNotNull(druidStoragePluginConfig);
+ assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress());
+ assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress());
+ assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes());
+ assertFalse(druidStoragePluginConfig.isEnabled());
}
@Test
@@ -58,6 +60,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
JsonNode storagePluginJson = mapper.readTree(druidConfigStr);
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
- assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100);
+ assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes());
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
index 1b0d9361eae..c4a3a43b578 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
@@ -20,10 +20,19 @@
import org.apache.drill.categories.DruidStorageTest;
import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+
+
@Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests")
@Category({SlowTest.class, DruidStorageTest.class})
public class TestDruidQueries extends DruidTestBase {
@@ -33,7 +42,7 @@ public void testStarQuery() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA))
.unOrdered()
- .expectsNumRecords(2)
+ .expectsNumRecords(876)
.go();
}
@@ -60,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
.unOrdered()
- .expectsNumRecords(3)
+ .expectsNumRecords(1)
.go();
}
@@ -72,7 +81,7 @@ public void testSingleColumnProject() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("comment")
- .expectsNumRecords(24433)
+ .expectsNumRecords(876)
.go();
}
@@ -84,7 +93,36 @@ public void testCountAllRowsQuery() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("mycount")
- .baselineValues(24433L)
+ .baselineValues(876L)
.go();
}
+
+ @Test
+ public void testGroupByQuery() throws Exception {
+ String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA);
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL)
+ .add("user_count", MinorType.BIGINT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow("Main", 702)
+ .addRow("User talk", 29)
+ .addRow("Wikipedia", 26)
+ .addRow("Talk", 17)
+ .addRow("User", 12)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA);
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match", 876L, cnt);
+ }
}