Skip to content

Commit

Permalink
Handle lastEvaluatedKey for query in DynamoDB (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Mar 28, 2022
1 parent 3465059 commit df4e06f
Show file tree
Hide file tree
Showing 12 changed files with 622 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class StorageIntegrationTestBase {
private static final String COL_NAME3 = "c3";
private static final String COL_NAME4 = "c4";
private static final String COL_NAME5 = "c5";
private static final String COL_NAME6 = "c6";

private static boolean initialized;
private static DistributedStorage storage;
Expand Down Expand Up @@ -96,6 +97,7 @@ private void createTable() throws ExecutionException {
.addColumn(COL_NAME3, DataType.INT)
.addColumn(COL_NAME4, DataType.INT)
.addColumn(COL_NAME5, DataType.BOOLEAN)
.addColumn(COL_NAME6, DataType.BLOB)
.addPartitionKey(COL_NAME1)
.addClusteringKey(COL_NAME4)
.addSecondaryIndex(COL_NAME3)
Expand Down Expand Up @@ -1305,7 +1307,7 @@ public void scan_ScanLargeData_ShouldRetrieveExpectedValues()
Key partitionKey = new Key(COL_NAME1, 1);
for (int i = 0; i < 345; i++) {
Key clusteringKey = new Key(COL_NAME4, i);
storage.put(new Put(partitionKey, clusteringKey));
storage.put(new Put(partitionKey, clusteringKey).withValue(COL_NAME6, new byte[5000]));
}
Scan scan = new Scan(partitionKey);

Expand All @@ -1329,7 +1331,7 @@ public void scan_ScanLargeDataWithOrdering_ShouldRetrieveExpectedValues()
Key partitionKey = new Key(COL_NAME1, 1);
for (int i = 0; i < 345; i++) {
Key clusteringKey = new Key(COL_NAME4, i);
storage.put(new Put(partitionKey, clusteringKey));
storage.put(new Put(partitionKey, clusteringKey).withValue(COL_NAME6, new byte[5000]));
}
Scan scan = new Scan(partitionKey).withOrdering(new Ordering(COL_NAME4, Order.ASC));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.scalar.db.storage.dynamo;

import static com.google.common.base.Preconditions.checkNotNull;

import com.scalar.db.api.Delete;
import com.scalar.db.api.DeleteIfExists;
import com.scalar.db.api.Operation;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.storage.NoMutationException;
import com.scalar.db.exception.storage.RetriableExecutionException;
import com.scalar.db.util.TableMetadataManager;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand All @@ -26,19 +24,17 @@
* @author Yuji Ito
*/
@ThreadSafe
public class DeleteStatementHandler extends StatementHandler {
public class DeleteStatementHandler {
private final DynamoDbClient client;
private final TableMetadataManager metadataManager;

public DeleteStatementHandler(DynamoDbClient client, TableMetadataManager metadataManager) {
super(client, metadataManager);
this.client = checkNotNull(client);
this.metadataManager = checkNotNull(metadataManager);
}

@Nonnull
@Override
public List<Map<String, AttributeValue>> handle(Operation operation) throws ExecutionException {
checkArgument(operation, Delete.class);
Delete delete = (Delete) operation;

TableMetadata tableMetadata = metadataManager.getTableMetadata(operation);
public void handle(Delete delete) throws ExecutionException {
TableMetadata tableMetadata = metadataManager.getTableMetadata(delete);
try {
delete(delete, tableMetadata);
} catch (ConditionalCheckFailedException e) {
Expand All @@ -48,8 +44,6 @@ public List<Map<String, AttributeValue>> handle(Operation operation) throws Exec
} catch (DynamoDbException e) {
throw new ExecutionException(e.getMessage(), e);
}

return Collections.emptyList();
}

private void delete(Delete delete, TableMetadata tableMetadata) {
Expand Down
32 changes: 18 additions & 14 deletions core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.scalar.db.storage.common.checker.OperationChecker;
import com.scalar.db.util.ScalarDbUtils;
import com.scalar.db.util.TableMetadataManager;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -30,7 +30,6 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

/**
* A storage implementation with DynamoDB for {@link DistributedStorage}
Expand Down Expand Up @@ -82,16 +81,23 @@ public Optional<Result> get(Get get) throws ExecutionException {
TableMetadata metadata = metadataManager.getTableMetadata(get);
ScalarDbUtils.addProjectionsForKeys(get, metadata);

List<Map<String, AttributeValue>> items = selectStatementHandler.handle(get);
if (items.size() > 1) {
throw new IllegalArgumentException("please use scan() for non-exact match selection");
}
if (items.isEmpty() || items.get(0) == null) {
return Optional.empty();
Scanner scanner = null;
try {
scanner = selectStatementHandler.handle(get);
Optional<Result> ret = scanner.one();
if (scanner.one().isPresent()) {
throw new IllegalArgumentException("please use scan() for non-exact match selection");
}
return ret;
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
LOGGER.warn("failed to close the scanner", e);
}
}
}

return Optional.of(
new ResultInterpreter(get.getProjections(), metadata).interpret(items.get(0)));
}

@Override
Expand All @@ -101,9 +107,7 @@ public Scanner scan(Scan scan) throws ExecutionException {
TableMetadata metadata = metadataManager.getTableMetadata(scan);
ScalarDbUtils.addProjectionsForKeys(scan, metadata);

List<Map<String, AttributeValue>> items = selectStatementHandler.handle(scan);

return new ScannerImpl(items, new ResultInterpreter(scan.getProjections(), metadata));
return selectStatementHandler.handle(scan);
}

@Override
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/com/scalar/db/storage/dynamo/EmptyScanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.Result;
import com.scalar.db.api.Scanner;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

public class EmptyScanner implements Scanner {

@Override
public Optional<Result> one() {
return Optional.empty();
}

@Override
public List<Result> all() {
return Collections.emptyList();
}

@Override
public void close() {}

@Override
public Iterator<Result> iterator() {
return Collections.emptyIterator();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.Result;
import com.scalar.db.api.Scanner;
import com.scalar.db.storage.common.ScannerIterator;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;

@NotThreadSafe
public final class GetItemScanner implements Scanner {
private final Map<String, AttributeValue> item;
private final ResultInterpreter resultInterpreter;

private boolean hasNext;
private ScannerIterator scannerIterator;

public GetItemScanner(
DynamoDbClient client, GetItemRequest request, ResultInterpreter resultInterpreter) {
GetItemResponse response = client.getItem(request);
if (response.hasItem()) {
item = response.item();
hasNext = true;
} else {
item = null;
hasNext = false;
}
this.resultInterpreter = resultInterpreter;
}

@Override
@Nonnull
public Optional<Result> one() {
if (!hasNext) {
return Optional.empty();
}

Result result = resultInterpreter.interpret(item);
hasNext = false;
return Optional.of(result);
}

@Override
@Nonnull
public List<Result> all() {
if (!hasNext) {
return Collections.emptyList();
}
Result result = resultInterpreter.interpret(item);
hasNext = false;
return Collections.singletonList(result);
}

@Override
@Nonnull
public Iterator<Result> iterator() {
if (scannerIterator == null) {
scannerIterator = new ScannerIterator(this);
}
return scannerIterator;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.Operation;
import static com.google.common.base.Preconditions.checkNotNull;

import com.scalar.db.api.Put;
import com.scalar.db.api.PutIfExists;
import com.scalar.db.api.PutIfNotExists;
Expand All @@ -9,10 +10,7 @@
import com.scalar.db.exception.storage.NoMutationException;
import com.scalar.db.exception.storage.RetriableExecutionException;
import com.scalar.db.util.TableMetadataManager;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand All @@ -27,19 +25,17 @@
* @author Yuji Ito
*/
@ThreadSafe
public class PutStatementHandler extends StatementHandler {
public class PutStatementHandler {
private final DynamoDbClient client;
private final TableMetadataManager metadataManager;

public PutStatementHandler(DynamoDbClient client, TableMetadataManager metadataManager) {
super(client, metadataManager);
this.client = checkNotNull(client);
this.metadataManager = checkNotNull(metadataManager);
}

@Nonnull
@Override
public List<Map<String, AttributeValue>> handle(Operation operation) throws ExecutionException {
checkArgument(operation, Put.class);
Put put = (Put) operation;

TableMetadata tableMetadata = metadataManager.getTableMetadata(operation);
public void handle(Put put) throws ExecutionException {
TableMetadata tableMetadata = metadataManager.getTableMetadata(put);
try {
execute(put, tableMetadata);
} catch (ConditionalCheckFailedException e) {
Expand All @@ -49,8 +45,6 @@ public List<Map<String, AttributeValue>> handle(Operation operation) throws Exec
} catch (DynamoDbException e) {
throw new ExecutionException(e.getMessage(), e);
}

return Collections.emptyList();
}

private void execute(Put put, TableMetadata tableMetadata) {
Expand Down
Loading

0 comments on commit df4e06f

Please sign in to comment.