Skip to content

Commit

Permalink
feat(aws): add aws lambdas and infra
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Oct 15, 2024
1 parent 88f3093 commit 1790653
Show file tree
Hide file tree
Showing 17 changed files with 924 additions and 50 deletions.
21 changes: 21 additions & 0 deletions cmd/lambda/getclaims/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
service, err := aws.Construct(config)
if err != nil {
panic(err)
}
handler := server.GetClaimsHandler(service)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
17 changes: 17 additions & 0 deletions cmd/lambda/getroot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
handler := server.GetRootHandler(config.Signer)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
42 changes: 42 additions & 0 deletions cmd/lambda/notifier/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"context"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
logging "github.com/ipfs/go-log/v2"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/service/providerindex/notifier"
)

var log = logging.Logger("lambda/notifier")

func makeHandler(notifier *notifier.Notifier) func(ctx context.Context, event events.EventBridgeEvent) {
return func(ctx context.Context, event events.EventBridgeEvent) {
synced, ts, err := notifier.Update(ctx)
if err != nil {
log.Errorf("error during notifier sync head check: %s", err.Error())
return
}
if !synced {
log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(ts))
}
}
}

func main() {
config := aws.FromEnv(context.Background())
// setup IPNI
// TODO: switch to double hashed client for reader privacy?
headStore := aws.NewS3Store(config.Config, config.NotifierHeadBucket, "")
notifier, err := notifier.NewNotifierWithStorage(config.IndexerURL, config.PrivateKey, headStore)
if err != nil {
panic(err)
}
sqsRemoteSyncNotifier := aws.NewSQSRemoteSyncNotifier(config.Config, config.NotifierTopicArn)
notifier.Notify(sqsRemoteSyncNotifier.NotifyRemoteSync)

lambda.Start(makeHandler(notifier))
}
21 changes: 21 additions & 0 deletions cmd/lambda/postclaims/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
service, err := aws.Construct(config)
if err != nil {
panic(err)
}
handler := server.PostClaimsHandler(config.Signer, service)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
72 changes: 72 additions & 0 deletions cmd/lambda/providercache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"errors"
"sync"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
logging "github.com/ipfs/go-log/v2"
goredis "github.com/redis/go-redis/v9"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service/providercacher"
)

var log = logging.Logger("lambda/providercache")

func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error {
job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body)
if err != nil {
return err
}
_, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index)
if err != nil {
return err
}
return nil
}

func makeHandler(sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher) func(ctx context.Context, sqsEvent events.SQSEvent) error {
return func(ctx context.Context, sqsEvent events.SQSEvent) error {
// process messages in parallel
results := make(chan error, len(sqsEvent.Records))
var wg sync.WaitGroup
for _, msg := range sqsEvent.Records {
wg.Add(1)
go func(msg events.SQSMessage) {
defer wg.Done()
err := handleMessage(ctx, sqsCachingDecoder, providerCacher, msg)
results <- err
}(msg)
}
wg.Wait()
// collect errors
close(results)
var err error
for nextErr := range results {
err = errors.Join(err, nextErr)
}
// return overall error
if err != nil {
return err
}
for _, msg := range sqsEvent.Records {
err := sqsCachingDecoder.CleanupMessage(ctx, msg.Body)
if err != nil {
log.Warnf("unable to cleanup message fully: %s", err.Error())
}
}
return nil
}
}

func main() {
config := aws.FromEnv(context.Background())
providerRedis := goredis.NewClient(&config.ProvidersRedis)
providerStore := redis.NewProviderStore(providerRedis)
providerCacher := providercacher.NewSimpleProviderCacher(providerStore)
sqsCachingDecoder := aws.NewSQSCachingDecoder(config.Config, config.CachingBucket)
lambda.Start(makeHandler(sqsCachingDecoder, providerCacher))
}
69 changes: 69 additions & 0 deletions cmd/lambda/remotesync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"encoding/json"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
goredis "github.com/redis/go-redis/v9"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service/providercacher"
"github.com/storacha/indexing-service/pkg/service/providerindex"
"github.com/storacha/indexing-service/pkg/service/providerindex/store"
)

var log = logging.Logger("lambda/providercache")

Check failure on line 20 in cmd/lambda/remotesync/main.go

View workflow job for this annotation

GitHub Actions / go-check / All

var log is unused (U1000)

func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error {

Check failure on line 22 in cmd/lambda/remotesync/main.go

View workflow job for this annotation

GitHub Actions / go-check / All

func handleMessage is unused (U1000)
job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body)
if err != nil {
return err
}
_, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index)
if err != nil {
return err
}
return nil
}

func makeHandler(remoteSyncer *providerindex.RemoteSyncer) func(ctx context.Context, snsEvent events.SNSEvent) error {
return func(ctx context.Context, snsEvent events.SNSEvent) error {
for _, record := range snsEvent.Records {
snsRecord := record.SNS
var snsRemoteSyncMessage aws.SNSRemoteSyncMessage
err := json.Unmarshal([]byte(snsRecord.Message), &snsRemoteSyncMessage)
if err != nil {
return err
}
headCid, err := cid.Parse(snsRemoteSyncMessage.Head)
if err != nil {
return err
}
head := cidlink.Link{Cid: headCid}
prevCid, err := cid.Parse(snsRemoteSyncMessage.Prev)
if err != nil {
return err
}
prev := cidlink.Link{Cid: prevCid}
remoteSyncer.HandleRemoteSync(ctx, head, prev)
}
return nil
}
}

func main() {
cfg := aws.FromEnv(context.Background())
providerRedis := goredis.NewClient(&cfg.ProvidersRedis)
providerStore := redis.NewProviderStore(providerRedis)
ipniStore := aws.NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix)
chunkLinksTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName)
metadataTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName)
publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable)
remoteSyncer := providerindex.NewRemoteSyncer(providerStore, publisherStore)
lambda.Start(makeHandler(remoteSyncer))
}
30 changes: 29 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23
toolchain go1.23.0

require (
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ds-flatfs v0.5.1
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e
Expand All @@ -21,8 +22,26 @@ require (

require (
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pion/ice/v2 v2.3.35 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand All @@ -32,6 +51,15 @@ require (
)

require (
github.com/aws/aws-lambda-go v1.47.0
github.com/aws/aws-sdk-go-v2/config v1.27.43
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2
github.com/aws/aws-sdk-go-v2/service/sns v1.33.2
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
github.com/awslabs/aws-lambda-go-api-proxy v0.16.2
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
Expand All @@ -40,7 +68,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
Expand Down
Loading

0 comments on commit 1790653

Please sign in to comment.