Skip to content

Commit

Permalink
Merge pull request #258 from sabbot/main
Browse files Browse the repository at this point in the history
some fixes to aws/dynamo_locking.go
  • Loading branch information
veziak authored May 17, 2023
2 parents b032ed1 + 45a542a commit b849ea8
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions pkg/aws/dynamo_locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,85 @@ package aws

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"os"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
)

const TABLE_NAME = "DiggerDynamoDBLockTable"
const (
TABLE_NAME = "DiggerDynamoDBLockTable"
TableCreationInterval = 1 * time.Second
TableCreationRetryCount = 10
TableLockTimeout = 60 * 60 * 24 * 90 * time.Second
)

type DynamoDbLock struct {
DynamoDb *dynamodb.DynamoDB
}

func isResourceNotFoundExceptionError(err error) bool {
if err != nil {
aerr, ok := err.(awserr.Error)
if !ok {
// not aws error
return false
}

if aerr.Code() == dynamodb.ErrCodeResourceNotFoundException {
// ErrCodeResourceNotFoundException code
return true
}
}
return false
}

func (dynamoDbLock *DynamoDbLock) waitUntilTableCreated() error {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(TABLE_NAME),
}
status, err := dynamoDbLock.DynamoDb.DescribeTable(input)
cnt := 0

for err == nil && *(status.Table.TableStatus) != "ACTIVE" {
time.Sleep(1 * time.Second)
if err != nil {
if !isResourceNotFoundExceptionError(err) {
return err
}
}

for *status.Table.TableStatus != "ACTIVE" {
time.Sleep(TableCreationInterval)
status, err = dynamoDbLock.DynamoDb.DescribeTable(input)
if err != nil {
if !isResourceNotFoundExceptionError(err) {
return err
}
}
cnt++
if cnt > 10 {
if cnt > TableCreationRetryCount {
fmt.Printf("DynamoDB failed to create, timed out during creation.\n" +
"Rerunning the action may cause creation to succeed\n")
os.Exit(1)
}
}
if err != nil {
return err
}

return nil
}

// TODO: refactor func to return actual error and fail on callers
func (dynamoDbLock *DynamoDbLock) createTableIfNotExists() {
input := &dynamodb.DescribeTableInput{
func (dynamoDbLock *DynamoDbLock) createTableIfNotExists() error {
_, err := dynamoDbLock.DynamoDb.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(TABLE_NAME),
}
})

_, err := dynamoDbLock.DynamoDb.DescribeTable(input)
// table already exists
if err == nil {
return
if err != nil {
if !isResourceNotFoundExceptionError(err) {
return err
}
}

createtbl_input := &dynamodb.CreateTableInput{
Expand Down Expand Up @@ -76,23 +108,27 @@ func (dynamoDbLock *DynamoDbLock) createTableIfNotExists() {
TableName: aws.String(TABLE_NAME),
}
_, err = dynamoDbLock.DynamoDb.CreateTable(createtbl_input)
if err != nil && os.Getenv("DEBUG") != "" {
fmt.Printf("%v\n", err)
} else {
err := dynamoDbLock.waitUntilTableCreated()
if err != nil {
if err != nil {
if os.Getenv("DEBUG") != "" {
fmt.Printf("%v\n", err)
}
fmt.Printf("DynamoDB Table %v has been created\n", TABLE_NAME)
return err
}

err = dynamoDbLock.waitUntilTableCreated()
if err != nil {
fmt.Printf("%v\n", err)
return err
}
fmt.Printf("DynamoDB Table %v has been created\n", TABLE_NAME)
return nil
}

func (dynamoDbLock *DynamoDbLock) Lock(transactionId int, resource string) (bool, error) {
dynamoDbLock.createTableIfNotExists()
// TODO: remove timeout completely
timeout := 60 * 60 * 24 * 90
now := time.Now().Format(time.RFC3339)
newTimeout := time.Now().Add(time.Duration(timeout) * time.Second).Format(time.RFC3339)
newTimeout := time.Now().Add(TableLockTimeout).Format(time.RFC3339)

expr, err := expression.NewBuilder().
WithCondition(
Expand Down Expand Up @@ -123,8 +159,10 @@ func (dynamoDbLock *DynamoDbLock) Lock(transactionId int, resource string) (bool

_, err = dynamoDbLock.DynamoDb.UpdateItem(input)
if err != nil {
if err.Error() == dynamodb.ErrCodeConditionalCheckFailedException {
return false, nil
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
return false, nil
}
}
return false, err
}
Expand Down Expand Up @@ -157,12 +195,12 @@ func (dynamoDbLock *DynamoDbLock) GetLock(lockId string) (*int, error) {
if err != nil {
return nil, err
}

if result.Item != nil {
transactionId := result.Item["transaction_id"].N
res, err := strconv.Atoi(*transactionId)
return &res, err
} else {
return nil, nil
}

return nil, nil
}

0 comments on commit b849ea8

Please sign in to comment.