Skip to content

Commit

Permalink
Added configurable scheduling selection and moving to rate scheduling (
Browse files Browse the repository at this point in the history
  • Loading branch information
rjtokenring authored Sep 13, 2024
1 parent c4e7031 commit ce274c0
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 79 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ vendor/
cover.out

dist/
deployment/binaries/

coverage_unit.txt
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket.
Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3.
By default, data extraction is performed every hour, extracting samples aggregated at 5min resolution. Non numeric values like strings are sampled at the given resolution.
By default, data extraction is performed every hour (configurable), extracting samples aggregated at 5min resolution (configurable).
Aggregation is performed as average over aggregation period.
Non numeric values like strings are sampled at the given resolution.

## Architecture

Expand All @@ -22,9 +24,9 @@ timestamp,thing_id,thing_name,property_id,property_name,property_type,value

Files are organized by date and files of the same day are grouped.
```
<bucket>:2024-09-04/2024-09-04-10.csv
<bucket>:2024-09-04/2024-09-04-11.csv
<bucket>:2024-09-04/2024-09-04-12.csv
<bucket>:2024-09-04/2024-09-04-10-00.csv
<bucket>:2024-09-04/2024-09-04-11-00.csv
<bucket>:2024-09-04/2024-09-04-12-00.csv
```

## Deployment via Cloud Formation Template
Expand All @@ -45,7 +47,7 @@ Before stack creation, two S3 buckets have to be created:
bucket must be in the same region where stack will be created.

Follow these steps to deploy a new stack:
* download [lambda code binaries](deployment/binaries/arduino-s3-integration-lambda.zip) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml)
* download [lambda code binaries](https://github.com/arduino/aws-s3-integration/releases) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml)
* upload CFT and binary zip file on an S3 bucket accessible by the AWS account. For the CFT yaml file, copy the Object URL (it will be required in next step).

![object URL](docs/objecturl.png)
Expand All @@ -67,12 +69,13 @@ These parameters are filled by CFT at stack creation time and can be adjusted la

| Parameter | Description |
| --------- | ----------- |
| /arduino/s3-importer/iot/api-key | IoT API key |
| /arduino/s3-importer/iot/api-secret | IoT API secret |
| /arduino/s3-importer/iot/org-id | (optional) organization id |
| /arduino/s3-importer/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
| /arduino/s3-importer/iot/samples-resolution-seconds | (optional) samples resolution (default: 300s) |
| /arduino/s3-importer/destination-bucket | S3 destination bucket |
| /arduino/s3-exporter/<stack-name>/iot/api-key | IoT API key |
| /arduino/s3-exporter/<stack-name>/iot/api-secret | IoT API secret |
| /arduino/s3-exporter/<stack-name>/iot/org-id | (optional) organization id |
| /arduino/s3-exporter/<stack-name>/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
| /arduino/s3-exporter/<stack-name>/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
| /arduino/s3-exporter/<stack-name>/destination-bucket | S3 destination bucket |
| /arduino/s3-exporter/<stack-name>/iot/scheduling | Execution scheduling |

### Tag filtering

Expand All @@ -85,7 +88,7 @@ You can use tag filtering if you need to reduce export to a specific set of Thin

![tag 1](docs/tag-1.png)

* Configure tag filter during CFT creation of by editing '/arduino/s3-importer/iot/filter/tags' parameter (syntax: tag1=value1,tag2=value2).
* Configure tag filter during CFT creation of by editing '/arduino/s3-exporter/<stack-name>/iot/filter/tags' parameter (syntax: tag1=value1,tag2=value2).

![tag filter](docs/tag-filter.png)

Expand Down
29 changes: 20 additions & 9 deletions business/tsextractor/tsextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,25 @@ func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor {
return &TsExtractor{iotcl: iotcl, logger: logger}
}

func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time, time.Time) {
// Compute time alignment
if resolutionSeconds <= 60 {
resolutionSeconds = 300 // Align to 5 minutes
}
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
return from, to
}

func (a *TsExtractor) ExportTSToS3(
ctx context.Context,
timeWindowInMinutes int,
thingsMap map[string]iotclient.ArduinoThing,
resolution int,
destinationS3Bucket string) error {

to := time.Now().Truncate(time.Hour).UTC()
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
// Truncate time to given resolution
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)

// Open s3 output writer
s3cl, err := s3.NewS3Client(destinationS3Bucket)
Expand All @@ -70,16 +80,16 @@ func (a *TsExtractor) ExportTSToS3(
var wg sync.WaitGroup
tokens := make(chan struct{}, importConcurrency)

a.logger.Infoln("=====> Export perf data - time window: ", timeWindowInMinutes, " minutes")
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to)
for thingID, thing := range thingsMap {

if thing.Properties == nil || len(thing.Properties) == 0 {
a.logger.Warn("Skipping thing with no properties: ", thingID)
continue
}

wg.Add(1)
tokens <- struct{}{}
wg.Add(1)

go func(thingID string, thing iotclient.ArduinoThing, writer *csv.CsvWriter) {
defer func() { <-tokens }()
Expand Down Expand Up @@ -111,13 +121,14 @@ func (a *TsExtractor) ExportTSToS3(
}

// Wait for all routines termination
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
wg.Wait()

// Close csv output writer and upload to s3
writer.Close()
defer writer.Delete()

destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15"))
destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
a.logger.Infof("Uploading file %s to bucket %s\n", destinationKey, s3cl.DestinationBucket())
if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil {
return err
Expand Down Expand Up @@ -159,7 +170,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
break
} else {
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
randomRateLimitingSleep()
}
}
Expand Down Expand Up @@ -261,7 +272,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
break
} else {
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
randomRateLimitingSleep()
}
}
Expand Down Expand Up @@ -321,7 +332,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
break
} else {
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
randomRateLimitingSleep()
}
}
Expand All @@ -337,7 +348,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
}

propertyID := strings.Replace(response.Query, "property.", "", 1)
a.logger.Infof("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues)
a.logger.Debugf("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues)
sampleCount += response.CountValues

propertyName, propertyType := extractPropertyNameAndType(thing, propertyID)
Expand Down
37 changes: 37 additions & 0 deletions business/tsextractor/tsextractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tsextractor

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows
from, to := computeTimeAlignment(3600, 60)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func TestTimeAlignment_15minTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows
from, to := computeTimeAlignment(3600, 15)
assert.Equal(t, int64(900), to.Unix()-from.Unix())
}

func TestTimeAlignment_15min_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 15min resolution
from, to := computeTimeAlignment(900, 60)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func TestTimeAlignment_5min_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 5min resolution
from, to := computeTimeAlignment(300, 60)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 5min resolution
from, to := computeTimeAlignment(-1, 60)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}
Binary file not shown.
66 changes: 44 additions & 22 deletions deployment/cloud-formation-template/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ AWSTemplateFormatVersion: '2010-09-09'
Description: Arduino S3 data exporter. For deployment and architectural details, see https://github.com/arduino/aws-s3-integration

Parameters:
LambdaFunctionName:
Type: String
Default: 'arduino-s3-csv-data-exporter'
Description: Name of the Lambda function.

LambdaCodeS3Bucket:
Type: String
Description: S3 bucket where the Lambda function ZIP file is stored.
Expand All @@ -30,16 +25,32 @@ Parameters:
Default: '<empty>'
Description: Arduino Organization ID (optional).

ExecutionScheduling:
Type: String
Description: "Choose the execution scheduling for the data export"
AllowedValues:
- 5 minutes
- 15 minutes
- 1 hour
- 1 day
Default: 1 hour

Resolution:
Type: String
Description: "Samples resolution data extraction resolution. 'raw' and '1 minute' are not supported for '1 day' scheduling"
AllowedValues:
- raw
- 1 minute
- 5 minutes
- 15 minutes
- 1 hour
Default: 5 minutes

TagFilter:
Type: String
Default: '<empty>'
Description: Filter things to import by tag (optional). Format> tag1=value1,tag2=value2

Resolution:
Type: Number
Default: 300
Description: Samples resolution in seconds. Default is 5 minutes (300s). Set to -1 to export raw data.

DestinationS3Bucket:
Type: String
Description: S3 bucket where CSV files will be stored.
Expand Down Expand Up @@ -73,7 +84,7 @@ Resources:
- ssm:GetParameter
- ssm:GetParameters
- ssm:GetParametersByPath
Resource: arn:aws:ssm:*:*:parameter/arduino/s3-importer/*
Resource: arn:aws:ssm:*:*:parameter/arduino/s3-*
- Effect: Allow
Action:
- s3:PutObject
Expand All @@ -87,8 +98,7 @@ Resources:
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName:
Ref: LambdaFunctionName
FunctionName: !Sub arduino-s3-csv-data-exporter-${AWS::StackName}
Handler: bootstrap
Role: !GetAtt ArduinoS3LambdaExecutionRole.Arn
Code:
Expand All @@ -99,12 +109,15 @@ Resources:
Runtime: provided.al2
Timeout: 900
MemorySize: 256
Environment:
Variables:
STACK_NAME: !Sub ${AWS::StackName}

# Parameters in Parameter Store
ApiKeyParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/iot/api-key
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/api-key
Type: String
Value:
Ref: IotApiKey
Expand All @@ -113,7 +126,7 @@ Resources:
ApiSecretParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/iot/api-secret
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/api-secret
Type: String
Value:
Ref: IotApiSecret
Expand All @@ -122,7 +135,7 @@ Resources:
OrgIdParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/iot/org-id
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/org-id
Type: String
Value:
Ref: IotOrgId
Expand All @@ -131,7 +144,7 @@ Resources:
FilterTagsParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/iot/filter/tags
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/filter/tags
Type: String
Value:
Ref: TagFilter
Expand All @@ -140,7 +153,7 @@ Resources:
ResolutionParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/iot/samples-resolution-seconds
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/samples-resolution
Type: String
Value:
Ref: Resolution
Expand All @@ -149,17 +162,27 @@ Resources:
DestinationS3BucketParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /arduino/s3-importer/destination-bucket
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/destination-bucket
Type: String
Value:
Ref: DestinationS3Bucket
Tier: Standard

ExecutionSchedulingParameter:
Type: AWS::SSM::Parameter
Properties:
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/scheduling
Type: String
Value:
Ref: ExecutionScheduling
Tier: Standard

# EventBridge Rule to trigger Lambda every hour
EventBridgeRule:
Type: AWS::Events::Rule
Properties:
ScheduleExpression: cron(10 * * * ? *)
ScheduleExpression:
Fn::Sub: "rate(${ExecutionScheduling})"
Targets:
- Arn: !GetAtt LambdaFunction.Arn
Id: LambdaTarget
Expand All @@ -170,8 +193,7 @@ Resources:
LambdaPermissionForEventBridge:
Type: AWS::Lambda::Permission
Properties:
FunctionName:
Ref: LambdaFunctionName
FunctionName: !Sub arduino-s3-csv-data-exporter-${AWS::StackName}
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt EventBridgeRule.Arn
Expand Down
Binary file modified docs/cft-stack-2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.1
github.com/aws/aws-sdk-go-v2/service/ssm v1.50.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/oauth2 v0.21.0
)

Expand All @@ -29,6 +30,9 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.20.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
Loading

0 comments on commit ce274c0

Please sign in to comment.