Skip to content

Commit

Permalink
#282 - Patching documents in parallel to change their folder makes so…
Browse files Browse the repository at this point in the history
…me disappear
  • Loading branch information
mfriesen committed Oct 16, 2024
1 parent 3e0ecbf commit c3cb9ae
Show file tree
Hide file tree
Showing 12 changed files with 654 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,25 @@ Map<String, AttributeValue> updateItem(AttributeValue pk, AttributeValue sk,
*/
Map<String, AttributeValue> updateValues(AttributeValue pk, AttributeValue sk,
Map<String, AttributeValue> updateValues);

/**
* Aquire Lock.
*
* @param pk {@link AttributeValue}
* @param sk {@link AttributeValue}
* @param aquireLockTimeoutInMs long
* @param lockExpirationInMs long
* @return boolean
*/
boolean acquireLock(AttributeValue pk, AttributeValue sk, long aquireLockTimeoutInMs,
long lockExpirationInMs);

/**
* Release Lock.
*
* @param pk {@link AttributeValue}
* @param sk {@link AttributeValue}
* @return boolean
*/
boolean releaseLock(AttributeValue pk, AttributeValue sk);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@

import static com.formkiq.aws.dynamodb.DbKeys.PK;
import static com.formkiq.aws.dynamodb.DbKeys.SK;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.formkiq.aws.dynamodb.objects.Strings;
Expand All @@ -43,13 +46,17 @@
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
Expand All @@ -61,6 +68,13 @@
*/
public final class DynamoDbServiceImpl implements DynamoDbService {

/** Max Backoff in MS. */
private static final int MAX_BACKOFF_IN_MS = 2000;
/** Default Backoff In Ms. */
private static final int DEFAULT_BACKOFF_IN_MS = 200;
/** Thousand constant. */
private static final int TS = 1000;

/** {@link DynamoDbClient}. */
private final DynamoDbClient dbClient;
/** Table Name. */
Expand Down Expand Up @@ -435,4 +449,61 @@ public Map<String, AttributeValue> updateValues(final AttributeValue pk, final A

return updateItem(pk, sk, values);
}

@Override
public boolean acquireLock(final AttributeValue pk, final AttributeValue sk,
final long aquireLockTimeoutInMs, final long lockExpirationInMs) {

boolean lock = false;
long expirationTime = Instant.now().getEpochSecond() + lockExpirationInMs / TS;

Map<String, AttributeValue> item = new HashMap<>();
item.put(PK, pk);
item.put(SK, getLock(sk));
item.put("ExpirationTime", AttributeValue.builder().n(Long.toString(expirationTime)).build());

Put.Builder put = Put.builder().tableName(tableName).item(item).conditionExpression(
"(attribute_not_exists(PK) and attribute_not_exists(SK)) OR ExpirationTime < :currentTime");

long startTime = System.currentTimeMillis();
long waitInterval = DEFAULT_BACKOFF_IN_MS;

while (System.currentTimeMillis() - startTime < aquireLockTimeoutInMs) {

try {

put.expressionAttributeValues(Map.of(":currentTime",
AttributeValue.builder().n(Long.toString(Instant.now().getEpochSecond())).build()));

TransactWriteItemsRequest tx = TransactWriteItemsRequest.builder()
.transactItems(TransactWriteItem.builder().put(put.build()).build()).build();

this.dbClient.transactWriteItems(tx);
lock = true;
break;

} catch (TransactionCanceledException e) {
// Lock is already held or transaction was canceled, wait and retry with exponential backoff
try {
TimeUnit.MILLISECONDS.sleep(waitInterval);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}

// Cap backoff at 1 second
waitInterval = Math.min(waitInterval * 2, MAX_BACKOFF_IN_MS);
}
}

return lock;
}

@Override
public boolean releaseLock(final AttributeValue pk, final AttributeValue sk) {
return deleteItem(pk, getLock(sk));
}

private AttributeValue getLock(final AttributeValue sk) {
return AttributeValue.fromS(sk.s() + ".lock");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* MIT License
*
* Copyright (c) 2018 - 2020 FormKiQ
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.formkiq.aws.dynamodb;

import com.formkiq.testutils.aws.DynamoDbExtension;
import com.formkiq.testutils.aws.DynamoDbTestServices;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import static com.formkiq.testutils.aws.DynamoDbExtension.DOCUMENTS_TABLE;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Unit Tests for {@link DynamoDbService}. */
@ExtendWith(DynamoDbExtension.class)
public class DynamoDbServiceTest {

/** Timeout. */
private static final long TIMEOUT = 20;

/** {@link DynamoDbService}. */
private DynamoDbService service;

/**
* Before Test.
*
* @throws Exception Exception
*/
@BeforeEach
public void before() throws Exception {
this.service =
new DynamoDbServiceImpl(DynamoDbTestServices.getDynamoDbConnection(), DOCUMENTS_TABLE);
}

/**
* Test aquire lock / release lock.
*
*/
@Test
@Timeout(TIMEOUT)
public void testAcquireLock01() {
// given
final long timeout = 2000;
final long lockExpiry = 10000;
AttributeValue pk = AttributeValue.fromS("test");
AttributeValue sk = AttributeValue.fromS("test1");

// when
boolean locked = this.service.acquireLock(pk, sk, timeout, lockExpiry);

// then
assertTrue(locked);
assertTrue(this.service.releaseLock(pk, sk));

// when
locked = this.service.acquireLock(pk, sk, timeout, lockExpiry);

// then
assertTrue(locked);
assertTrue(this.service.releaseLock(pk, sk));
}

/**
* Test aquire lock when already locked.
*/
@Test
@Timeout(TIMEOUT)
public void testAcquireLock02() {
// given
final long timeout = 2000;
final long lockExpiry = 10000;
AttributeValue pk = AttributeValue.fromS("test");
AttributeValue sk = AttributeValue.fromS("test1");

// when
boolean lock1 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
boolean lock2 = this.service.acquireLock(pk, sk, timeout, lockExpiry);

// then
assertFalse(lock2);
assertTrue(lock1);
}

/**
* Test aquire lock after expiration.
*/
@Test
@Timeout(TIMEOUT)
public void testAcquireLock03() {
// given
final long timeout = 5000;
final long lockExpiry = 1000;
AttributeValue pk = AttributeValue.fromS("test");
AttributeValue sk = AttributeValue.fromS("test1");

// when
boolean lock1 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
boolean lock2 = this.service.acquireLock(pk, sk, timeout, lockExpiry);

// then
assertTrue(lock1);
assertTrue(lock2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,34 @@ private void insertAttributeIfExists(final Collection<String> attributeKeys, fin
}
}

private FolderIndexRecord createDocumentPath(final String siteId, final DocumentItem document,
final boolean isPathChanged) {

FolderIndexRecord folderIndexRecord = null;

if (isPathChanged) {
List<FolderIndexRecord> folders =
this.folderIndexProcessor.createFolders(siteId, document.getPath(), document.getUserId());

folderIndexRecord = this.folderIndexProcessor.addFileToFolder(siteId,
document.getDocumentId(), Objects.last(folders), document.getPath());

String filename = Strings.getFilename(document.getPath());
if (!filename.contains(folderIndexRecord.path())) {
String path = createPath(folders, folderIndexRecord);
document.setPath(path);
}
}

return folderIndexRecord;
}

private String createPath(final List<FolderIndexRecord> folders,
final FolderIndexRecord folderIndexRecord) {
return String.join("/", folders.stream().map(FolderIndexRecord::path).toList()) + "/"
+ folderIndexRecord.path();
}

private List<SchemaAttributes> getSchemaAttributes(final String siteId,
final Collection<DocumentAttributeRecord> documentAttributeRecords) {

Expand Down Expand Up @@ -1552,14 +1580,14 @@ private boolean isNextDayPagination(final String siteId, final String dateKey,
* Is Document Path Changed.
*
* @param previous {@link Map}
* @param current {@link Map}
* @param item {@link DocumentItem}
* @return boolean
*/
private boolean isPathChanges(final Map<String, AttributeValue> previous,
final Map<String, AttributeValue> current) {
final DocumentItem item) {
String path0 = previous.containsKey("path") ? previous.get("path").s() : "";
String path1 = current.containsKey("path") ? current.get("path").s() : "";
return !path1.equals(path0) && !"".equals(path0);
String path1 = item.getPath();
return !path1.equals(path0);
}

@Override
Expand Down Expand Up @@ -1796,11 +1824,10 @@ public boolean restoreSoftDeletedDocument(final String siteId, final String docu
DocumentItem item = new DocumentItemDynamoDb(documentId, new Date(), userId);
item.setPath(path);

List<Map<String, AttributeValue>> folderIndex =
this.folderIndexProcessor.generateIndex(siteId, item);
FolderIndexRecord record = createDocumentPath(siteId, item, true);

WriteRequestBuilder writeBuilder =
new WriteRequestBuilder().appends(this.documentTableName, folderIndex);
new WriteRequestBuilder().append(this.documentTableName, record.getAttributes(siteId));

writeBuilder.batchWriteItem(this.dbClient);
}
Expand Down Expand Up @@ -1905,16 +1932,9 @@ private void saveDocument(final Map<String, AttributeValue> keys, final String s

removeNullMetadata(document, documentValues);

// String documentVersionsTableName = this.versionsService.getDocumentVersionsTableName();
// boolean hasDocumentChanged = documentVersionsTableName != null && !previous.isEmpty()
// && isChangedMatching(previous, current);

// if (hasDocumentChanged) {
// this.versionsService.addDocumentVersionAttributes(previous, documentValues);
// }
boolean isPathChanged = isPathChanges(previous, document);

List<Map<String, AttributeValue>> folderIndex =
this.folderIndexProcessor.generateIndex(siteId, document);
FolderIndexRecord folderIndexRecord = createDocumentPath(siteId, document, isPathChanged);
if (!isEmpty(document.getPath())) {
documentValues.put("path", AttributeValue.fromS(document.getPath()));
}
Expand All @@ -1923,23 +1943,25 @@ private void saveDocument(final Map<String, AttributeValue> keys, final String s
getSaveTagsAttributes(siteId, document.getDocumentId(), tags, options.timeToLive());

WriteRequestBuilder writeBuilder = new WriteRequestBuilder()
.append(this.documentTableName, documentValues).appends(this.documentTableName, tagValues)
.appends(this.documentTableName, folderIndex);
.append(this.documentTableName, documentValues).appends(this.documentTableName, tagValues);

if (folderIndexRecord != null) {
writeBuilder.append(this.documentTableName, folderIndexRecord.getAttributes(siteId));
}

appendDocumentAttributes(writeBuilder, siteId, document.getDocumentId(), attributes,
documentExists, AttributeValidation.FULL, options.getValidationAccess());

// if (hasDocumentChanged) {
// writeBuilder = writeBuilder.appends(documentVersionsTableName, List.of(previous));
// }

if (writeBuilder.batchWriteItem(this.dbClient)) {

String documentId = document.getDocumentId();
saveDocumentInterceptor(siteId, documentId, current, previous);

if (isPathChanges(previous, documentValues)) {
this.folderIndexProcessor.deletePath(siteId, documentId, previous.get("path").s());
if (isPathChanged) {
String path = previous.containsKey("path") ? previous.get("path").s() : null;
if (!Strings.isEmpty(path)) {
this.folderIndexProcessor.deletePath(siteId, documentId, previous.get("path").s());
}
}

List<String> tagKeys =
Expand Down
Loading

0 comments on commit c3cb9ae

Please sign in to comment.