Skip to content

Commit

Permalink
Backport to branch(3.11) : Fix snapshot management issues (#2023)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmt authored Jul 2, 2024
1 parent 59b91b7 commit e5fd7e8
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -68,35 +72,43 @@ public CrudHandler(
this.parallelExecutor = parallelExecutor;
}

public Optional<Result> get(Get get) throws CrudException {
List<String> originalProjections = new ArrayList<>(get.getProjections());
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);
Snapshot.Key key = new Snapshot.Key(get);
readUnread(key, get);
return createGetResult(key, originalProjections);
return createGetResult(key, get, originalProjections);
}

@VisibleForTesting
void readUnread(Snapshot.Key key, Get get) throws CrudException {
if (!snapshot.containsKeyInReadSet(key)) {
if (!snapshot.containsKeyInGetSet(get)) {
read(key, get);
}
}

private void read(Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
// Keep the read set latest to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, result);
snapshot.put(get, result); // for re-read and validation
return;
}
throw new UncommittedRecordException(
get, result.get(), "This record needs recovery", snapshot.getId());
}

private Optional<Result> createGetResult(Snapshot.Key key, List<String> projections)
private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String> projections)
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.get(key)
.mergeResult(key, snapshot.get(get))
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -115,20 +127,22 @@ public List<Result> scan(Scan scan) throws CrudException {
return results;
}

private List<Result> scanInternal(Scan scan) throws CrudException {
List<String> originalProjections = new ArrayList<>(scan.getProjections());
private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

List<Result> results = new ArrayList<>();
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<List<Snapshot.Key>> keysInSnapshot = snapshot.get(scan);
if (keysInSnapshot.isPresent()) {
for (Snapshot.Key key : keysInSnapshot.get()) {
snapshot.get(key).ifPresent(results::add);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
}

List<Snapshot.Key> keys = new ArrayList<>();
Scanner scanner = null;
try {
scanner = getFromStorage(scan);
Expand All @@ -141,12 +155,12 @@ private List<Result> scanInternal(Scan scan) throws CrudException {

Snapshot.Key key = new Snapshot.Key(scan, r);

if (!snapshot.containsKeyInReadSet(key)) {
snapshot.put(key, Optional.of(result));
}
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));

keys.add(key);
snapshot.get(key).ifPresent(results::add);
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -157,15 +171,16 @@ private List<Result> scanInternal(Scan scan) throws CrudException {
}
}
}
snapshot.put(scan, keys);
snapshot.put(scan, results);

return createScanResults(scan, originalProjections, results);
}

private List<Result> createScanResults(Scan scan, List<String> projections, List<Result> results)
private List<Result> createScanResults(
Scan scan, List<String> projections, Map<Snapshot.Key, TransactionResult> results)
throws CrudException {
TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get());
return results.stream()
return results.values().stream()
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled))
.collect(Collectors.toList());
}
Expand All @@ -182,8 +197,8 @@ public void put(Put put) throws CrudException {
}

if (put.getCondition().isPresent()) {
if (put.isImplicitPreReadEnabled()) {
readUnread(key, createGet(key));
if (put.isImplicitPreReadEnabled() && !snapshot.containsKeyInReadSet(key)) {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
Expand All @@ -196,7 +211,9 @@ public void delete(Delete delete) throws CrudException {
Snapshot.Key key = new Snapshot.Key(delete);

if (delete.getCondition().isPresent()) {
readUnread(key, createGet(key));
if (!snapshot.containsKeyInReadSet(key)) {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
}
Expand Down Expand Up @@ -226,30 +243,21 @@ public void readIfImplicitPreReadEnabled() throws CrudException {
}
}

private Get createGet(Snapshot.Key key) {
private Get createGet(Snapshot.Key key) throws CrudException {
GetBuilder.BuildableGet buildableGet =
Get.newBuilder()
.namespace(key.getNamespace())
.table(key.getTable())
.partitionKey(key.getPartitionKey());
key.getClusteringKey().ifPresent(buildableGet::clusteringKey);
return buildableGet.build();
return (Get) prepareStorageSelection(buildableGet.build());
}

// Although this class is not thread-safe, this method is actually thread-safe because the storage
// is thread-safe
@VisibleForTesting
Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
try {
get.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames();
get.withProjections(afterImageColumnNames);
}
get.withConsistency(Consistency.LINEARIZABLE);
return storage.get(get).map(TransactionResult::new);
} catch (ExecutionException e) {
throw new CrudException("Get failed", e, snapshot.getId());
Expand All @@ -258,18 +266,26 @@ Optional<TransactionResult> getFromStorage(Get get) throws CrudException {

private Scanner getFromStorage(Scan scan) throws CrudException {
try {
scan.clearProjections();
return storage.scan(scan);
} catch (ExecutionException e) {
throw new CrudException("Scan failed", e, snapshot.getId());
}
}

private Selection prepareStorageSelection(Selection selection) throws CrudException {
try {
selection.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames();
scan.withProjections(afterImageColumnNames);
tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames();
selection.withProjections(afterImageColumnNames);
}
scan.withConsistency(Consistency.LINEARIZABLE);
return storage.scan(scan);
selection.withConsistency(Consistency.LINEARIZABLE);
return selection;
} catch (ExecutionException e) {
throw new CrudException("Scan failed", e, snapshot.getId());
throw new CrudException("Getting a table metadata failed", e, snapshot.getId());
}
}

Expand Down
Loading

0 comments on commit e5fd7e8

Please sign in to comment.