Skip to content

Commit

Permalink
Working?
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Jan 1, 2024
1 parent fca094b commit f4d9af9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 33 deletions.
2 changes: 2 additions & 0 deletions contrib/storage-druid/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Directory to store oauth tokens for testing Googlesheets Storage plugin
/src/test/resources/logback-test.xml
7 changes: 0 additions & 7 deletions contrib/storage-druid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<!-- use 2.9.1 for Java 7 projects -->
<version>3.11.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.vector.BaseValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +44,7 @@

public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
private static final int BATCH_SIZE = 4096;
private static final ObjectMapper objectMapper = new ObjectMapper();
private final DruidStoragePlugin plugin;
private final DruidSubScan.DruidSubScanSpec scanSpec;
Expand All @@ -55,6 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private int maxRecordsToRead = -1;
private JsonLoaderBuilder jsonBuilder;
private JsonLoaderImpl jsonLoader;
private SchemaNegotiator negotiator;
private ResultSetLoader resultSetLoader;
private CustomErrorContext errorContext;

Expand All @@ -75,34 +76,50 @@ public DruidBatchRecordReader(DruidSubScan subScan,

@Override
public boolean open(SchemaNegotiator negotiator) {
resultSetLoader = negotiator.build();
errorContext = negotiator.parentErrorContext();
negotiator.setErrorContext(errorContext);
this.negotiator = negotiator;
this.errorContext = this.negotiator.parentErrorContext();
this.negotiator.batchSize(BATCH_SIZE);
this.negotiator.setErrorContext(errorContext);

resultSetLoader = this.negotiator.build();

jsonBuilder = new JsonLoaderBuilder()
.resultSetLoader(resultSetLoader)
.standardOptions(negotiator.queryOptions())
.errorContext(errorContext);

return true;
}

@Override
public boolean next() {
jsonBuilder = new JsonLoaderBuilder()
.resultSetLoader(resultSetLoader)
.standardOptions(negotiator.queryOptions())
.errorContext(errorContext);
int eventCounter = 0;
boolean result = false;
try {
String query = getQuery();
logger.debug("Executing query: {}", query);
DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
setNextOffset(druidScanResponse);

StringBuilder events = new StringBuilder();
for (ObjectNode eventNode : druidScanResponse.getEvents()) {
jsonLoader = (JsonLoaderImpl) jsonBuilder
.fromString(eventNode.toString())
events.append(eventNode);
events.append("\n");
eventCounter++;
}


jsonLoader = (JsonLoaderImpl) jsonBuilder
.fromString(events.toString())
.build();

result = jsonLoader.readBatch();
result = jsonLoader.readBatch();

if (eventCounter < BATCH_SIZE) {
return false;
} else {
return result;
}
return result;
} catch (Exception e) {
throw UserException
.dataReadError(e)
Expand All @@ -123,8 +140,8 @@ public void close() {
private String getQuery() throws JsonProcessingException {
int queryThreshold =
maxRecordsToRead >= 0
? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead)
: BaseValueVector.INITIAL_VALUE_ALLOCATION;
? Math.min(BATCH_SIZE, maxRecordsToRead)
: BATCH_SIZE;
ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
ScanQuery scanQuery =
scanQueryBuilder.build(
Expand All @@ -140,7 +157,6 @@ private String getQuery() throws JsonProcessingException {
}

private void setNextOffset(DruidScanResponse druidScanResponse) {
//nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.drill.exec.store.druid;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;

public class DruidOffsetTracker {
private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class);
private BigInteger nextOffset;

public DruidOffsetTracker() {
Expand All @@ -32,5 +36,6 @@ public BigInteger getOffset() {

public void setNextOffset(BigInteger offset) {
nextOffset = nextOffset.add(offset);
logger.debug("Incrementing offset by {}", offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.IOException;
import java.net.URISyntaxException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class DruidStoragePluginConfigTest {

Expand All @@ -40,11 +42,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed()
Resources.getResource("bootstrap-storage-plugins.json").toURI()));
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
assertThat(druidStoragePluginConfig).isNotNull();
assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082");
assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081");
assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200);
assertThat(druidStoragePluginConfig.isEnabled()).isFalse();
assertNotNull(druidStoragePluginConfig);
assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress());
assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress());
assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes());
assertFalse(druidStoragePluginConfig.isEnabled());
}

@Test
Expand All @@ -59,6 +61,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
JsonNode storagePluginJson = mapper.readTree(druidConfigStr);
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100);
assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@

import org.apache.drill.categories.DruidStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.junit.Assert.assertEquals;


@Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests")
@Category({SlowTest.class, DruidStorageTest.class})
public class TestDruidQueries extends DruidTestBase {
Expand All @@ -33,7 +42,7 @@ public void testStarQuery() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA))
.unOrdered()
.expectsNumRecords(2)
.expectsNumRecords(876)
.go();
}

Expand All @@ -60,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
.unOrdered()
.expectsNumRecords(3)
.expectsNumRecords(1)
.go();
}

Expand All @@ -72,7 +81,7 @@ public void testSingleColumnProject() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("comment")
.expectsNumRecords(24433)
.expectsNumRecords(876)
.go();
}

Expand All @@ -84,7 +93,36 @@ public void testCountAllRowsQuery() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("mycount")
.baselineValues(24433L)
.baselineValues(876L)
.go();
}

@Test
public void testGroupByQuery() throws Exception {
String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA);
RowSet results = client.queryBuilder().sql(sql).rowSet();

TupleMetadata expectedSchema = new SchemaBuilder()
.add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL)
.add("user_count", MinorType.BIGINT)
.buildSchema();

RowSet expected = client.rowSetBuilder(expectedSchema)
.addRow("Main", 702)
.addRow("User talk", 29)
.addRow("Wikipedia", 26)
.addRow("Talk", 17)
.addRow("User", 12)
.build();

new RowSetComparison(expected).verifyAndClearAll(results);
}

@Test
public void testSerDe() throws Exception {
String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA);
String plan = queryBuilder().sql(sql).explainJson();
long cnt = queryBuilder().physical(plan).singletonLong();
assertEquals("Counts should match", 876L, cnt);
}
}

0 comments on commit f4d9af9

Please sign in to comment.