From 17906534bf52c7eabb17ebb7fc60da89c972e152 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 14 Oct 2024 23:42:37 -0700 Subject: [PATCH] feat(aws): add aws lambdas and infra --- cmd/lambda/getclaims/main.go | 21 +++ cmd/lambda/getroot/main.go | 17 +++ cmd/lambda/notifier/main.go | 42 ++++++ cmd/lambda/postclaims/main.go | 21 +++ cmd/lambda/providercache/main.go | 72 +++++++++ cmd/lambda/remotesync/main.go | 69 +++++++++ go.mod | 30 +++- go.sum | 68 +++++++++ pkg/aws/dynamoprovidercontexttable.go | 100 +++++++++++++ pkg/aws/s3store.go | 56 +++++++ pkg/aws/service.go | 120 +++++++++++++++ pkg/aws/snsremotesyncnotifer.go | 43 ++++++ pkg/aws/sqscachingqueue.go | 141 ++++++++++++++++++ pkg/construct/construct.go | 73 ++++++--- .../providerindex/notifier/headstate.go | 28 ++-- .../providerindex/notifier/notifier.go | 40 +++-- pkg/service/providerindex/store/store.go | 33 +++- 17 files changed, 924 insertions(+), 50 deletions(-) create mode 100644 cmd/lambda/getclaims/main.go create mode 100644 cmd/lambda/getroot/main.go create mode 100644 cmd/lambda/notifier/main.go create mode 100644 cmd/lambda/postclaims/main.go create mode 100644 cmd/lambda/providercache/main.go create mode 100644 cmd/lambda/remotesync/main.go create mode 100644 pkg/aws/dynamoprovidercontexttable.go create mode 100644 pkg/aws/s3store.go create mode 100644 pkg/aws/service.go create mode 100644 pkg/aws/snsremotesyncnotifer.go create mode 100644 pkg/aws/sqscachingqueue.go diff --git a/cmd/lambda/getclaims/main.go b/cmd/lambda/getclaims/main.go new file mode 100644 index 0000000..6da85be --- /dev/null +++ b/cmd/lambda/getclaims/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + "net/http" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/awslabs/aws-lambda-go-api-proxy/httpadapter" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/server" +) + +func main() { + config := aws.FromEnv(context.Background()) + service, err := aws.Construct(config) + if err != nil { + panic(err) + } + handler := server.GetClaimsHandler(service) + lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext) +} diff --git a/cmd/lambda/getroot/main.go b/cmd/lambda/getroot/main.go new file mode 100644 index 0000000..a052b1b --- /dev/null +++ b/cmd/lambda/getroot/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "context" + "net/http" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/awslabs/aws-lambda-go-api-proxy/httpadapter" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/server" +) + +func main() { + config := aws.FromEnv(context.Background()) + handler := server.GetRootHandler(config.Signer) + lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext) +} diff --git a/cmd/lambda/notifier/main.go b/cmd/lambda/notifier/main.go new file mode 100644 index 0000000..74e9cf4 --- /dev/null +++ b/cmd/lambda/notifier/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "context" + "time" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + logging "github.com/ipfs/go-log/v2" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/service/providerindex/notifier" +) + +var log = logging.Logger("lambda/notifier") + +func makeHandler(notifier *notifier.Notifier) func(ctx context.Context, event events.EventBridgeEvent) { + return func(ctx context.Context, event events.EventBridgeEvent) { + synced, ts, err := notifier.Update(ctx) + if err != nil { + log.Errorf("error during notifier sync head check: %s", err.Error()) + return + } + if !synced { + log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(ts)) + } + } +} + +func main() { + config := aws.FromEnv(context.Background()) + // setup IPNI + // TODO: switch to double hashed client for reader privacy? + headStore := aws.NewS3Store(config.Config, config.NotifierHeadBucket, "") + notifier, err := notifier.NewNotifierWithStorage(config.IndexerURL, config.PrivateKey, headStore) + if err != nil { + panic(err) + } + sqsRemoteSyncNotifier := aws.NewSQSRemoteSyncNotifier(config.Config, config.NotifierTopicArn) + notifier.Notify(sqsRemoteSyncNotifier.NotifyRemoteSync) + + lambda.Start(makeHandler(notifier)) +} diff --git a/cmd/lambda/postclaims/main.go b/cmd/lambda/postclaims/main.go new file mode 100644 index 0000000..28d6f84 --- /dev/null +++ b/cmd/lambda/postclaims/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + "net/http" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/awslabs/aws-lambda-go-api-proxy/httpadapter" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/server" +) + +func main() { + config := aws.FromEnv(context.Background()) + service, err := aws.Construct(config) + if err != nil { + panic(err) + } + handler := server.PostClaimsHandler(config.Signer, service) + lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext) +} diff --git a/cmd/lambda/providercache/main.go b/cmd/lambda/providercache/main.go new file mode 100644 index 0000000..6e41b95 --- /dev/null +++ b/cmd/lambda/providercache/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "context" + "errors" + "sync" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + logging "github.com/ipfs/go-log/v2" + goredis "github.com/redis/go-redis/v9" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/redis" + "github.com/storacha/indexing-service/pkg/service/providercacher" +) + +var log = logging.Logger("lambda/providercache") + +func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error { + job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body) + if err != nil { + return err + } + _, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index) + if err != nil { + return err + } + return nil +} + +func makeHandler(sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher) func(ctx context.Context, sqsEvent events.SQSEvent) error { + return func(ctx context.Context, sqsEvent events.SQSEvent) error { + // process messages in parallel + results := make(chan error, len(sqsEvent.Records)) + var wg sync.WaitGroup + for _, msg := range sqsEvent.Records { + wg.Add(1) + go func(msg events.SQSMessage) { + defer wg.Done() + err := handleMessage(ctx, sqsCachingDecoder, providerCacher, msg) + results <- err + }(msg) + } + wg.Wait() + // collect errors + close(results) + var err error + for nextErr := range results { + err = errors.Join(err, nextErr) + } + // return overall error + if err != nil { + return err + } + for _, msg := range sqsEvent.Records { + err := sqsCachingDecoder.CleanupMessage(ctx, msg.Body) + if err != nil { + log.Warnf("unable to cleanup message fully: %s", err.Error()) + } + } + return nil + } +} + +func main() { + config := aws.FromEnv(context.Background()) + providerRedis := goredis.NewClient(&config.ProvidersRedis) + providerStore := redis.NewProviderStore(providerRedis) + providerCacher := providercacher.NewSimpleProviderCacher(providerStore) + sqsCachingDecoder := aws.NewSQSCachingDecoder(config.Config, config.CachingBucket) + lambda.Start(makeHandler(sqsCachingDecoder, providerCacher)) +} diff --git a/cmd/lambda/remotesync/main.go b/cmd/lambda/remotesync/main.go new file mode 100644 index 0000000..73d4cd2 --- /dev/null +++ b/cmd/lambda/remotesync/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "encoding/json" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + goredis "github.com/redis/go-redis/v9" + "github.com/storacha/indexing-service/pkg/aws" + "github.com/storacha/indexing-service/pkg/redis" + "github.com/storacha/indexing-service/pkg/service/providercacher" + "github.com/storacha/indexing-service/pkg/service/providerindex" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" +) + +var log = logging.Logger("lambda/providercache") + +func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error { + job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body) + if err != nil { + return err + } + _, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index) + if err != nil { + return err + } + return nil +} + +func makeHandler(remoteSyncer *providerindex.RemoteSyncer) func(ctx context.Context, snsEvent events.SNSEvent) error { + return func(ctx context.Context, snsEvent events.SNSEvent) error { + for _, record := range snsEvent.Records { + snsRecord := record.SNS + var snsRemoteSyncMessage aws.SNSRemoteSyncMessage + err := json.Unmarshal([]byte(snsRecord.Message), &snsRemoteSyncMessage) + if err != nil { + return err + } + headCid, err := cid.Parse(snsRemoteSyncMessage.Head) + if err != nil { + return err + } + head := cidlink.Link{Cid: headCid} + prevCid, err := cid.Parse(snsRemoteSyncMessage.Prev) + if err != nil { + return err + } + prev := cidlink.Link{Cid: prevCid} + remoteSyncer.HandleRemoteSync(ctx, head, prev) + } + return nil + } +} + +func main() { + cfg := aws.FromEnv(context.Background()) + providerRedis := goredis.NewClient(&cfg.ProvidersRedis) + providerStore := redis.NewProviderStore(providerRedis) + ipniStore := aws.NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix) + chunkLinksTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName) + metadataTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName) + publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable) + remoteSyncer := providerindex.NewRemoteSyncer(providerStore, publisherStore) + lambda.Start(makeHandler(remoteSyncer)) +} diff --git a/go.mod b/go.mod index bd69812..1a084a1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23 toolchain go1.23.0 require ( + github.com/aws/aws-sdk-go-v2 v1.32.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-ds-flatfs v0.5.1 github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e @@ -21,8 +22,26 @@ require ( require ( github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.41 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect + github.com/aws/smithy-go v1.22.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pion/ice/v2 v2.3.35 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -32,6 +51,15 @@ require ( ) require ( + github.com/aws/aws-lambda-go v1.47.0 + github.com/aws/aws-sdk-go-v2/config v1.27.43 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2 + github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2 + github.com/aws/aws-sdk-go-v2/service/sns v1.33.2 + github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2 + github.com/awslabs/aws-lambda-go-api-proxy v0.16.2 github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect @@ -40,7 +68,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect diff --git a/go.sum b/go.sum index b85c618..a44bbc6 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,60 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1sXVI= +github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= +github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI= +github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA= +github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU= +github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= +github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= +github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12 h1:zYf8E8zaqolHA5nQ+VmX2r3wc4K6xw5i6xKvvMjZBL0= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12/go.mod h1:vYGIVLASk19Gb0FGwAcwES+qQF/aekD7m2G/X6mBOdQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 h1:7edmS3VOBDhK00b/MwGtGglCm7hhwNYnjJs/PgFdMQE= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21/go.mod h1:Q9o5h4HoIWG8XfzxqiuK/CGUbepCJ8uTlaE3bAbxytQ= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2 h1:kJqyYcGqhWFmXqjRrtFFD4Oc9FXiskhsll2xnlpe8Do= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2/go.mod h1:+t2Zc5VNOzhaWzpGE+cEYZADsgAAQT5v55AO+fhU+2s= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2 h1:E7Tuo0ipWpBl0f3uThz8cZsuyD5H8jLCnbtbKR4YL2s= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2/go.mod h1:txOfweuNPBLhHodsV+C2lvPPRTommVTWbts9SZV6Myc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 h1:4FMHqLfk0efmTqhXVRL5xYRqlEBNBiRI7N6w4jsEdd4= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2/go.mod h1:LWoqeWlK9OZeJxsROW2RqrSPvQHKTpp69r/iDjwsSaw= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.2 h1:1G7TTQNPNv5fhCyIQGYk8FOggLgkzKq6c4Y1nOGzAOE= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.2/go.mod h1:+ybYGLXoF7bcD7wIcMcklxyABZQmuBf1cHUhvY6FGIo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 h1:s7NA1SOw8q/5c0wr8477yOPp0z+uBaXBnLE0XYb0POA= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2/go.mod h1:fnjjWyAW/Pj5HYOxl9LJqWtEwS7W2qgcRLWP+uWbss0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 h1:t7iUP9+4wdc5lt3E41huP+GvQZJD38WLsgVp4iOtAjg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2/go.mod h1:/niFCtmuQNxqx9v8WAPq5qh7EH25U4BF6tjoyq9bObM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 h1:xxHGZ+wUgZNACQmxtdvP5tgzfsxGS3vPpTP5Hy3iToE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3/go.mod h1:cB6oAuus7YXRZhWCc1wIwPywwZ1XwweNp2TVAEGYeB8= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2 h1:Rrqru2wYkKQCS2IM5/JrgKUQIoNTqA6y/iuxkjzxC6M= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2/go.mod h1:QuCURO98Sqee2AXmqDNxKXYFm2OEDAVAPApMqO0Vqnc= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.2 h1:GeVRrB1aJsGdXxdPY6VOv0SWs+pfdeDlKgiBxi0+V6I= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.2/go.mod h1:c6Sj8zleZXYs4nyU3gpDKTzPWu7+t30YUXoLYRpbUvU= +github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2 h1:kmbcoWgbzfh5a6rvfjOnfHSGEqD13qu1GfTPRZqg0FI= +github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2/go.mod h1:/UPx74a3M0WYeT2yLQYG/qHhkPlPXd6TsppfGgy2COk= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/awslabs/aws-lambda-go-api-proxy v0.16.2 h1:CJyGEyO1CIwOnXTU40urf0mchf6t3voxpvUDikOU9LY= +github.com/awslabs/aws-lambda-go-api-proxy v0.16.2/go.mod h1:vxxjwBHe/KbgFeNlAP/Tvp4SsVRL3WQamcWRxqVh0z0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -108,6 +162,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -313,6 +369,10 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -419,8 +479,14 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= +github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= +github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.20.0 h1:PE84V2mHqoT1sglvHc8ZdQtPcwmvvt29WLEEO3xmdZw= github.com/onsi/ginkgo/v2 v2.20.0/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= +github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU= +github.com/onsi/gomega v1.27.7/go.mod h1:1p8OOlwo2iUUDsHnOrjE5UKYJ+e3W8eQ3qSlRahPmr4= github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk= github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -991,6 +1057,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/aws/dynamoprovidercontexttable.go b/pkg/aws/dynamoprovidercontexttable.go new file mode 100644 index 0000000..6327ae3 --- /dev/null +++ b/pkg/aws/dynamoprovidercontexttable.go @@ -0,0 +1,100 @@ +package aws + +import ( + "context" + "errors" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" +) + +// ErrDynamoRecordNotFound is used when there is no record in a dynamo table +// (given that GetItem does not actually error) +var ErrDynamoRecordNotFound = errors.New("no record found in dynamo table") + +// DynamoProviderContextTable implements the store.ProviderContextTable interface on dynamodb +type DynamoProviderContextTable struct { + tableName string + dynamoDbClient *dynamodb.Client +} + +var _ store.ProviderContextTable = (*DynamoProviderContextTable)(nil) + +// NewDynamoProviderContextTable returns a ProviderContextTable connected to a AWS DynamoDB table +func NewDynamoProviderContextTable(cfg aws.Config, tableName string) *DynamoProviderContextTable { + return &DynamoProviderContextTable{ + tableName: tableName, + dynamoDbClient: dynamodb.NewFromConfig(cfg), + } +} + +// Delete implements store.ProviderContextTable. +func (d *DynamoProviderContextTable) Delete(ctx context.Context, p peer.ID, contextID []byte) error { + providerContextItem := providerContextItem{p.String(), contextID, nil} + _, err := d.dynamoDbClient.DeleteItem(ctx, &dynamodb.DeleteItemInput{ + TableName: aws.String(d.tableName), Key: providerContextItem.GetKey(), + }) + return err +} + +// Get implements store.ProviderContextTable. +func (d *DynamoProviderContextTable) Get(ctx context.Context, p peer.ID, contextID []byte) ([]byte, error) { + providerContextItem := providerContextItem{p.String(), contextID, nil} + response, err := d.dynamoDbClient.GetItem(ctx, &dynamodb.GetItemInput{ + Key: providerContextItem.GetKey(), + TableName: aws.String(d.tableName), + ProjectionExpression: aws.String("Data"), + }) + if err != nil { + return nil, fmt.Errorf("retrieving item: %w", err) + } + if response.Item == nil { + return nil, store.NewErrNotFound(ErrDynamoRecordNotFound) + } + err = attributevalue.UnmarshalMap(response.Item, &providerContextItem) + if err != nil { + return nil, fmt.Errorf("deserializing item: %w", err) + } + return providerContextItem.Data, nil +} + +// Put implements store.ProviderContextTable. +func (d *DynamoProviderContextTable) Put(ctx context.Context, p peer.ID, contextID []byte, data []byte) error { + item, err := attributevalue.MarshalMap(providerContextItem{ + Provider: p.String(), + ContextID: contextID, + Data: data, + }) + if err != nil { + return fmt.Errorf("serializing item: %w", err) + } + _, err = d.dynamoDbClient.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(d.tableName), Item: item, + }) + return fmt.Errorf("storing item: %w", err) +} + +type providerContextItem struct { + Provider string `dynamodbav:"provider"` + ContextID []byte `dynamodbav:"contextID"` + Data []byte `dynamodbav:"data"` +} + +// GetKey returns the composite primary key of the provider & contextID in a format that can be +// sent to DynamoDB. +func (p providerContextItem) GetKey() map[string]types.AttributeValue { + provider, err := attributevalue.Marshal(p.Provider) + if err != nil { + panic(err) + } + contextID, err := attributevalue.Marshal(p.ContextID) + if err != nil { + panic(err) + } + return map[string]types.AttributeValue{"provider": provider, "contextID": contextID} +} diff --git a/pkg/aws/s3store.go b/pkg/aws/s3store.go new file mode 100644 index 0000000..8bbff18 --- /dev/null +++ b/pkg/aws/s3store.go @@ -0,0 +1,56 @@ +package aws + +import ( + "context" + "errors" + "io" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" +) + +// S3Store implements the store.Store interface on S3 +type S3Store struct { + bucket string + keyPrefix string + s3Client *s3.Client +} + +var _ store.Store = (*S3Store)(nil) + +// Get implements store.Store. +func (s *S3Store) Get(ctx context.Context, key string) (io.ReadCloser, error) { + outPut, err := s.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.keyPrefix + key), + }) + if err != nil { + var noSuchKeyError *types.NoSuchKey + // wrap in error recognizable as a not found error for Store interface consumers + if errors.As(err, &noSuchKeyError) { + return nil, store.NewErrNotFound(err) + } + return nil, err + } + return outPut.Body, nil +} + +// Put implements store.Store. +func (s *S3Store) Put(ctx context.Context, key string, data io.Reader) error { + _, err := s.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.keyPrefix + key), + Body: data, + }) + return err +} + +func NewS3Store(cfg aws.Config, bucket string, keyPrefix string) *S3Store { + return &S3Store{ + s3Client: s3.NewFromConfig(cfg), + bucket: bucket, + keyPrefix: keyPrefix, + } +} diff --git a/pkg/aws/service.go b/pkg/aws/service.go new file mode 100644 index 0000000..6d61b42 --- /dev/null +++ b/pkg/aws/service.go @@ -0,0 +1,120 @@ +package aws + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/redis/go-redis/v9" + "github.com/storacha/go-ucanto/principal" + ed25519 "github.com/storacha/go-ucanto/principal/ed25519/signer" + "github.com/storacha/indexing-service/pkg/construct" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" + "github.com/storacha/indexing-service/pkg/types" +) + +// ErrNoPrivateKey means that the value returned from Secrets was empty +var ErrNoPrivateKey = errors.New("no value for private key") + +func mustGetEnv(envVar string) string { + value := os.Getenv(envVar) + if len(value) == 0 { + panic(fmt.Errorf("missing env var: %s", envVar)) + } + return value +} + +// Config describes all the values required to setup AWS from the environment +type Config struct { + construct.ServiceConfig + aws.Config + SQSCachingQueueURL string + CachingBucket string + ChunkLinksTableName string + MetadataTableName string + IPNIStoreBucket string + IPNIStorePrefix string + NotifierHeadBucket string + NotifierTopicArn string + principal.Signer +} + +// FromEnv constructs the AWS Configuration from the environment +func FromEnv(ctx context.Context) Config { + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + panic(fmt.Errorf("loading aws default config: %w", err)) + } + secretsClient := secretsmanager.NewFromConfig(awsConfig) + response, err := secretsClient.GetSecretValue(ctx, &secretsmanager.GetSecretValueInput{ + SecretId: aws.String(mustGetEnv("PRIVATE_KEY")), + }) + if err != nil { + panic(fmt.Errorf("retrieving private key: %w", err)) + } + if response.SecretString == nil { + panic(ErrNoPrivateKey) + } + id, err := ed25519.Parse(*response.SecretString) + if err != nil { + panic(fmt.Errorf("parsing private key: %s", err)) + } + cryptoPrivKey, err := crypto.UnmarshalEd25519PrivateKey(id.Raw()) + if err != nil { + panic(fmt.Errorf("unmarshaling private key: %w", err)) + } + + ipniStoreKeyPrefix := os.Getenv("IPNI_STORE_KEY_PREFIX") + if len(ipniStoreKeyPrefix) == 0 { + ipniStoreKeyPrefix = "/ipni/v1/ad/" + } + + return Config{ + Config: awsConfig, + Signer: id, + ServiceConfig: construct.ServiceConfig{ + PrivateKey: cryptoPrivKey, + ProvidersRedis: redis.Options{ + Addr: mustGetEnv("PROVIDER_REDIS_URL"), + Password: mustGetEnv("PROVIDER_REDIS_PASSWD"), + }, + ClaimsRedis: redis.Options{ + Addr: mustGetEnv("CLAIMS_REDIS_URL"), + Password: mustGetEnv("CLAIMS_REDIS_PASSWD"), + }, + IndexesRedis: redis.Options{ + Addr: mustGetEnv("INDEXES_REDIS_URL"), + Password: mustGetEnv("INDEXES_REDIS_PASSWD"), + }, + IndexerURL: mustGetEnv("IPNI_ENDPOINT"), + PublisherAnnounceAddrs: []string{mustGetEnv("IPNI_PUBLISHER_ANNOUNCE_ADDRESS")}, + }, + SQSCachingQueueURL: mustGetEnv("PROVIDER_CACHING_QUEUE_URL"), + CachingBucket: mustGetEnv("PROVIDER_CACHING_BUCKET_NAME"), + ChunkLinksTableName: mustGetEnv("CHUNK_LINKS_TABLE_NAME"), + MetadataTableName: mustGetEnv("METADATA_TABLE_NAME"), + IPNIStoreBucket: mustGetEnv("IPNI_STORE_BUCKET_NAME"), + IPNIStorePrefix: ipniStoreKeyPrefix, + NotifierHeadBucket: mustGetEnv("NOTIFIER_HEAD_BUCKET_NAME"), + NotifierTopicArn: mustGetEnv("NOTIFIER_SNS_TOPIC_ARN"), + } +} + +// Construct constructs types.Service from AWS deps for Lamda functions +func Construct(cfg Config) (types.Service, error) { + cachingQueue := NewSQSCachingQueue(cfg.Config, cfg.SQSCachingQueueURL, cfg.CachingBucket) + ipniStore := NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix) + chunkLinksTable := NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName) + metadataTable := NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName) + publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable) + return construct.Construct(cfg.ServiceConfig, + construct.SkipNotification(), + construct.WithCachingQueue(cachingQueue), + construct.WithPublisherStore(publisherStore), + construct.WithStartIPNIServer(false)) +} diff --git a/pkg/aws/snsremotesyncnotifer.go b/pkg/aws/snsremotesyncnotifer.go new file mode 100644 index 0000000..6dbbf2d --- /dev/null +++ b/pkg/aws/snsremotesyncnotifer.go @@ -0,0 +1,43 @@ +package aws + +import ( + "context" + "encoding/json" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sns" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" +) + +var log = logging.Logger("aws") + +type SNSRemoteSyncMessage struct { + Head string `json:"Head,omitempty"` + Prev string `json:"Prev,omitempty"` +} + +type SNSRemoteSyncNotifier struct { + topicArn string + snsClient *sns.Client +} + +func NewSQSRemoteSyncNotifier(config aws.Config, topicArn string) *SNSRemoteSyncNotifier { + return &SNSRemoteSyncNotifier{ + snsClient: sns.NewFromConfig(config), + topicArn: topicArn, + } +} +func (s *SNSRemoteSyncNotifier) NotifyRemoteSync(ctx context.Context, head, prev ipld.Link) { + messageJSON, err := json.Marshal(SNSRemoteSyncMessage{ + Head: head.String(), + Prev: prev.String(), + }) + if err != nil { + log.Errorf("serializing remote sync message: %s", err.Error()) + } + _, err = s.snsClient.Publish(ctx, &sns.PublishInput{TopicArn: aws.String(s.topicArn), Message: aws.String(string(messageJSON))}) + if err != nil { + log.Errorf("serializing remote sync message: %s", err.Error()) + } +} diff --git a/pkg/aws/sqscachingqueue.go b/pkg/aws/sqscachingqueue.go new file mode 100644 index 0000000..59c5543 --- /dev/null +++ b/pkg/aws/sqscachingqueue.go @@ -0,0 +1,141 @@ +package aws + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/google/uuid" + "github.com/ipni/go-libipni/find/model" + "github.com/storacha/indexing-service/pkg/blobindex" + "github.com/storacha/indexing-service/pkg/service/blobindexlookup" + "github.com/storacha/indexing-service/pkg/service/providercacher" +) + +// CachingQueueMessage is the struct that is serialized onto an SQS message queue in JSON +type CachingQueueMessage struct { + JobID uuid.UUID `json:"JobID,omitempty"` + Provider model.ProviderResult `json:"Provider,omitempty"` +} + +// SQSCachingQueue implements the providercacher.CachingQueue interface using SQS +type SQSCachingQueue struct { + queueURL string + bucket string + s3Client *s3.Client + sqsClient *sqs.Client +} + +// NewSQSCachingQueue returns a new SQSCachingQueue for the given aws config +func NewSQSCachingQueue(cfg aws.Config, queurURL string, bucket string) *SQSCachingQueue { + return &SQSCachingQueue{ + queueURL: queurURL, + bucket: bucket, + s3Client: s3.NewFromConfig(cfg), + sqsClient: sqs.NewFromConfig(cfg), + } +} + +// Queue implements blobindexlookup.CachingQueue. +func (s *SQSCachingQueue) Queue(ctx context.Context, job providercacher.ProviderCachingJob) error { + uuid := uuid.New() + index, err := job.Index.Archive() + if err != nil { + return fmt.Errorf("serializing index to CAR: %w", err) + } + _, err = s.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(uuid.String()), + Body: index, + }) + if err != nil { + return fmt.Errorf("saving index CAR to S3: %w", err) + } + err = s.sendMessage(ctx, CachingQueueMessage{ + JobID: uuid, + Provider: job.Provider, + }) + if err != nil { + // error sending message so cleanup queue + _, s3deleteErr := s.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(uuid.String()), + }) + if s3deleteErr != nil { + err = errors.Join(err, fmt.Errorf("cleaning up index CAR on S3: %w", s3deleteErr)) + } + } + return err +} + +func (s *SQSCachingQueue) sendMessage(ctx context.Context, msg CachingQueueMessage) error { + + messageJSON, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("serializing message json: %w", err) + } + _, err = s.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{ + QueueUrl: aws.String(s.queueURL), + MessageBody: aws.String(string(messageJSON)), + }) + if err != nil { + return fmt.Errorf("enqueueing message: %w", err) + } + return nil +} + +var _ blobindexlookup.CachingQueue = (*SQSCachingQueue)(nil) + +// SQSCachingDecoder provides interfaces for working with caching jobs received over SQS +type SQSCachingDecoder struct { + bucket string + s3Client *s3.Client +} + +// NewSQSCachingDecoder returns a new decoder for the given AWS config +func NewSQSCachingDecoder(cfg aws.Config, bucket string) *SQSCachingDecoder { + return &SQSCachingDecoder{ + bucket: bucket, + s3Client: s3.NewFromConfig(cfg), + } +} + +// DecodeMessage decodes a provider caching job from the SQS message body, reading the stored index from S3 +func (s *SQSCachingDecoder) DecodeMessage(ctx context.Context, messageBody string) (providercacher.ProviderCachingJob, error) { + var msg CachingQueueMessage + err := json.Unmarshal([]byte(messageBody), &msg) + if err != nil { + return providercacher.ProviderCachingJob{}, fmt.Errorf("deserializing message: %w", err) + } + received, err := s.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(msg.JobID.String()), + }) + if err != nil { + return providercacher.ProviderCachingJob{}, fmt.Errorf("reading stored index CAR: %w", err) + } + defer received.Body.Close() + index, err := blobindex.Extract(received.Body) + if err != nil { + return providercacher.ProviderCachingJob{}, fmt.Errorf("deserializing index: %w", err) + } + return providercacher.ProviderCachingJob{Provider: msg.Provider, Index: index}, nil +} + +// CleanupMessage removes stored information in the s3 bucket +func (s *SQSCachingDecoder) CleanupMessage(ctx context.Context, messageBody string) error { + var msg CachingQueueMessage + err := json.Unmarshal([]byte(messageBody), &msg) + if err != nil { + return fmt.Errorf("deserializing message: %w", err) + } + _, err = s.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(msg.JobID.String()), + }) + return err +} diff --git a/pkg/construct/construct.go b/pkg/construct/construct.go index 10eaaf8..fb18cbc 100644 --- a/pkg/construct/construct.go +++ b/pkg/construct/construct.go @@ -29,7 +29,7 @@ import ( var log = logging.Logger("service") var providerIndexNamespace = datastore.NewKey("providerindex/") -var providerIndexPublisherNamespace = providerIndexNamespace.ChildString("publisher/") +var providerIndexPublisherNamespace = providerIndexNamespace.Child(datastore.NewKey("publisher/")) // ServiceConfig sets specific config values for the service type ServiceConfig struct { @@ -56,6 +56,8 @@ type config struct { opts []service.Option ds datastore.Batching skipNotification bool + startIPNIServer bool + publisherStore store.PublisherStore } // Option configures how the node is construct @@ -85,6 +87,24 @@ func SkipNotification() Option { } } +// WithStartIPNIServer determines when IPNI adds will be served directly over HTTP +// Defaults true +func WithStartIPNIServer(startIPNIServer bool) Option { + return func(cfg *config) error { + cfg.startIPNIServer = startIPNIServer + return nil + } +} + +// WithPublisherStore overrides the store used for IPNI advertisements +// If not used with startIPNIServer = false, store.AdvertStore must implement store.FullStore +func WithPublisherStore(publisherStore store.PublisherStore) Option { + return func(cfg *config) error { + cfg.publisherStore = publisherStore + return nil + } +} + // WithDataPath constructs a flat FS datastore at the specified path to use for ads func WithDataPath(dataPath string) Option { return func(cfg *config) error { @@ -183,45 +203,53 @@ func Construct(sc ServiceConfig, opts ...Option) (Service, error) { return nil, err } - ds := cfg.ds - if ds == nil { - log.Warnf("datastore not configured, using in-memory store") - ds = dssync.MutexWrap(datastore.NewMapDatastore()) + var ds datastore.Batching + publisherStore := cfg.publisherStore + if publisherStore == nil { + ds = initializeDatastore(&cfg) + // setup the datastore for publishing to IPNI + publisherStore = store.FromDatastore(namespace.Wrap(ds, providerIndexPublisherNamespace)) } - // setup the datastore for publishing to IPNI - store := store.FromDatastore(namespace.Wrap(ds, providerIndexPublisherNamespace)) - // setup remote sync notification if !cfg.skipNotification { - notifier, err := notifier.NewNotifierWithStorage(sc.IndexerURL, sc.PrivateKey, namespace.Wrap(ds, providerIndexNamespace)) + // initialize datastore if not already initialized + if ds == nil { + ds = initializeDatastore(&cfg) + } + notifier, err := notifier.NewNotifierWithStorage(sc.IndexerURL, sc.PrivateKey, store.SimpleStoreFromDatastore(namespace.Wrap(ds, providerIndexNamespace))) if err != nil { return nil, fmt.Errorf("creating IPNI remote sync notifier: %w", err) } s.startupFuncs = append(s.startupFuncs, func(ctx context.Context) error { notifier.Start(ctx); return nil }) s.shutdownFuncs = append(s.shutdownFuncs, func(context.Context) error { notifier.Stop(); return nil }) // Setup handling ipni remote sync notifications - notifier.Notify(providerindex.NewRemoteSyncer(providersCache, store).HandleRemoteSync) + notifier.Notify(providerindex.NewRemoteSyncer(providersCache, publisherStore).HandleRemoteSync) } publisher, err := publisher.New( sc.PrivateKey, - store, - publisher.WithDirectAnnounce("https://cid.contact"), + publisherStore, + publisher.WithDirectAnnounce(sc.IndexerURL), publisher.WithAnnounceAddrs(sc.PublisherAnnounceAddrs...), ) if err != nil { return nil, fmt.Errorf("creating IPNI publisher: %w", err) } - srv, err := server.NewServer(store, server.WithHTTPListenAddrs(sc.PublisherListenAddr)) - if err != nil { - return nil, fmt.Errorf("creating server for IPNI ads: %w", err) + if cfg.startIPNIServer { + encodableStore, ok := publisherStore.(store.EncodeableStore) + if !ok { + return nil, fmt.Errorf("publisher store is incompatible with serving over HTTP (must implement store.EncodableStore)") + } + srv, err := server.NewServer(encodableStore, server.WithHTTPListenAddrs(sc.PublisherListenAddr)) + if err != nil { + return nil, fmt.Errorf("creating server for IPNI ads: %w", err) + } + s.startupFuncs = append(s.startupFuncs, srv.Start) + s.shutdownFuncs = append(s.shutdownFuncs, srv.Shutdown) } - s.startupFuncs = append(s.startupFuncs, srv.Start) - s.shutdownFuncs = append(s.shutdownFuncs, srv.Shutdown) - // build read through fetchers // TODO: add sender / publisher / linksystem / legacy systems providerIndex := providerindex.NewProviderIndex(providersCache, findClient, publisher, nil) @@ -239,3 +267,12 @@ func Construct(sc ServiceConfig, opts ...Option) (Service, error) { return s, nil } + +func initializeDatastore(cfg *config) datastore.Batching { + ds := cfg.ds + if ds == nil { + log.Warnf("datastore not configured, using in-memory store") + ds = dssync.MutexWrap(datastore.NewMapDatastore()) + } + return ds +} diff --git a/pkg/service/providerindex/notifier/headstate.go b/pkg/service/providerindex/notifier/headstate.go index 3b86cc9..29fc799 100644 --- a/pkg/service/providerindex/notifier/headstate.go +++ b/pkg/service/providerindex/notifier/headstate.go @@ -1,41 +1,47 @@ package notifier import ( + "bytes" "context" - "errors" "fmt" + "io" cid "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/storacha/go-ucanto/core/ipld" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" ) -var remoteHeadPrefix = datastore.NewKey("head/remote/") +const remoteHeadPrefix = "head/remote/" type HeadState struct { - ds datastore.Batching - hdkey datastore.Key + ds store.Store + hdkey string cached ipld.Link } -func NewHeadState(ds datastore.Batching, hostname string) (*HeadState, error) { +func NewHeadState(ds store.Store, hostname string) (*HeadState, error) { var hd ipld.Link - hdkey := remoteHeadPrefix.ChildString(hostname) - v, err := ds.Get(context.Background(), hdkey) + hdkey := remoteHeadPrefix + hostname + r, err := ds.Get(context.Background(), hdkey) if err != nil { - if !errors.Is(err, datastore.ErrNotFound) { + if !store.IsNotFound(err) { return nil, fmt.Errorf("getting remote IPNI head CID from datastore: %w", err) } } else { + defer r.Close() + v, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("reading IPNI head CID: %w", err) + } c, err := cid.Cast(v) if err != nil { return nil, fmt.Errorf("parsing remote IPNI head CID: %w", err) } hd = cidlink.Link{Cid: c} } - return &HeadState{ds: ds, cached: hd}, nil + return &HeadState{ds: ds, hdkey: hdkey, cached: hd}, nil } func (h *HeadState) Get(ctx context.Context) ipld.Link { @@ -43,7 +49,7 @@ func (h *HeadState) Get(ctx context.Context) ipld.Link { } func (h *HeadState) Set(ctx context.Context, head ipld.Link) error { - err := h.ds.Put(ctx, h.hdkey, []byte(head.Binary())) + err := h.ds.Put(ctx, h.hdkey, bytes.NewReader([]byte(head.Binary()))) if err != nil { return fmt.Errorf("saving remote IPNI sync'd head: %w", err) } diff --git a/pkg/service/providerindex/notifier/notifier.go b/pkg/service/providerindex/notifier/notifier.go index e21f2c5..6c2060c 100644 --- a/pkg/service/providerindex/notifier/notifier.go +++ b/pkg/service/providerindex/notifier/notifier.go @@ -6,13 +6,13 @@ import ( "net/url" "time" - "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" ipnifind "github.com/ipni/go-libipni/find/client" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/storacha/indexing-service/pkg/service/providerindex/store" ) var log = logging.Logger("publisher") @@ -57,26 +57,40 @@ func (n *Notifier) Start(ctx context.Context) { ticker.Stop() return case <-ticker.C: - head, err := GetLastAdvertisement(ctx, n.client, n.provider) + synced, ts, err := n.Update(ctx) if err != nil { - log.Errorf("fetching last advert CID: %w", err) + log.Errorf(err.Error()) continue } - prev := n.head.Get(ctx) - if !DidSync(head, prev) { - log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(n.ts)) + if !synced { + log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(ts)) continue } - for _, f := range n.notify { - f(ctx, head, prev) - } - n.head.Set(ctx, head) - n.ts = time.Now() } } }() } +func (n *Notifier) Update(ctx context.Context) (bool, time.Time, error) { + head, err := GetLastAdvertisement(ctx, n.client, n.provider) + if err != nil { + return false, n.ts, fmt.Errorf("fetching last advert CID: %w", err) + } + prev := n.head.Get(ctx) + if !DidSync(head, prev) { + return false, n.ts, nil + } + err = n.head.Set(ctx, head) + if err != nil { + return false, n.ts, fmt.Errorf("updating head state: %w", err) + } + for _, f := range n.notify { + f(ctx, head, prev) + } + n.ts = time.Now() + return true, n.ts, nil +} + func (n *Notifier) Notify(f NotifyRemoteSyncFunc) { n.notify = append(n.notify, f) } @@ -114,13 +128,13 @@ func NewRemoteSyncNotifier(addr string, id crypto.PrivKey, head NotifierHead) (* return &Notifier{client: c, head: head, ts: time.Now(), provider: provider}, nil } -func NewNotifierWithStorage(addr string, id crypto.PrivKey, ds datastore.Batching) (*Notifier, error) { +func NewNotifierWithStorage(addr string, id crypto.PrivKey, store store.Store) (*Notifier, error) { addrURL, err := url.Parse(addr) if err != nil { return nil, fmt.Errorf("parsing URL for remote sync notifications: %w", err) } - headState, err := NewHeadState(ds, addrURL.Hostname()) + headState, err := NewHeadState(store, addrURL.Hostname()) if err != nil { return nil, fmt.Errorf("error setting up notification tracking") } diff --git a/pkg/service/providerindex/store/store.go b/pkg/service/providerindex/store/store.go index 9004b82..9a3cd0e 100644 --- a/pkg/service/providerindex/store/store.go +++ b/pkg/service/providerindex/store/store.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "iter" "os" @@ -28,13 +29,26 @@ import ( ) var log = logging.Logger("store") -var ErrNotFound = errors.New("not found") + +type ErrNotFound struct { + underlying error +} + +func NewErrNotFound(underlying error) ErrNotFound { + return ErrNotFound{underlying: underlying} +} + +func (e ErrNotFound) Error() string { + return fmt.Sprintf("unable to find underlying: %s", e.underlying.Error()) +} + +func (e ErrNotFound) Unwrap() error { + return e.underlying +} const ( keyToMetadataMapPrefix = "map/keyMD/" keyToChunkLinkMapPrefix = "map/keyChunkLink/" - entriesPrefix = "entries/" - latestAdvKey = "sync/adv" headKey = "head" ) @@ -186,7 +200,7 @@ func (s *AdStore) DeleteMetadataForProviderAndContextID(ctx context.Context, p p return s.metadata.Delete(ctx, p, contextID) } -func NewAdvertStore(store Store, chunkLinks, metadata ProviderContextTable) *AdStore { +func NewPublisherStore(store Store, chunkLinks, metadata ProviderContextTable) *AdStore { return &AdStore{store, chunkLinks, metadata} } @@ -373,7 +387,8 @@ func toChunk(mhs []multihash.Multihash, next ipld.Link) *schema.EntryChunk { func IsNotFound(err error) bool { // solve for the unfortuante lack of standards on not found errors - return errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFound) + var errNotFound ErrNotFound + return errors.Is(err, datastore.ErrNotFound) || errors.As(err, &errNotFound) } func providerContextKey(provider peer.ID, contextID []byte) datastore.Key { @@ -460,8 +475,12 @@ func asCID(link ipld.Link) cid.Cid { return cid.MustParse(link.String()) } +func SimpleStoreFromDatastore(ds datastore.Batching) Store { + return &dsStoreAdapter{ds} +} + func FromDatastore(ds datastore.Batching) FullStore { - return NewAdvertStore( + return NewPublisherStore( &dsStoreAdapter{ds}, &dsProviderContextTable{namespace.Wrap(ds, datastore.NewKey(keyToChunkLinkMapPrefix))}, &dsProviderContextTable{namespace.Wrap(ds, datastore.NewKey(keyToMetadataMapPrefix))}, @@ -472,5 +491,5 @@ func FromLocalStore(storagePath string, ds datastore.Batching) FullStore { store := &directoryStore{storagePath} chunkLinksStore := &dsProviderContextTable{namespace.Wrap(ds, datastore.NewKey(keyToChunkLinkMapPrefix))} mdStore := &dsProviderContextTable{namespace.Wrap(ds, datastore.NewKey(keyToMetadataMapPrefix))} - return NewAdvertStore(store, chunkLinksStore, mdStore) + return NewPublisherStore(store, chunkLinksStore, mdStore) }