Skip to content

Commit

Permalink
Better log and error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
rjtokenring committed Sep 18, 2024
1 parent b77b7d7 commit cebb158
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
4 changes: 4 additions & 0 deletions app/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func StartExporter(
}

if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
if writer != nil {
writer.Close()
defer writer.Delete()
}
logger.Error("Error aligning time series samples: ", err)
return err
} else {
Expand Down
17 changes: 17 additions & 0 deletions business/tsextractor/tsextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tsextractor
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -87,6 +88,7 @@ func (a *TsExtractor) ExportTSToFile(

var wg sync.WaitGroup
tokens := make(chan struct{}, importConcurrency)
errorChannel := make(chan error, len(thingsMap))

if isRawResolution(resolution) {
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw")
Expand All @@ -112,20 +114,23 @@ func (a *TsExtractor) ExportTSToFile(
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
if err != nil {
a.logger.Error("Error populating raw time series data: ", err)
errorChannel <- err
return
}
} else {
// Populate numeric time series data
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer)
if err != nil {
a.logger.Error("Error populating time series data: ", err)
errorChannel <- err
return
}

// Populate string time series data, if any
err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating string time series data: ", err)
errorChannel <- err
return
}
}
Expand All @@ -135,6 +140,18 @@ func (a *TsExtractor) ExportTSToFile(
// Wait for all routines termination
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
wg.Wait()
close(errorChannel)

// Check if there were errors
detectedErrors := false
for err := range errorChannel {
if err != nil {
a.logger.Error(err)
}
}
if detectedErrors {
return writer, from, errors.New("errors detected during data export")
}

return writer, from, nil
}
Expand Down
5 changes: 3 additions & 2 deletions lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type AWSS3ImportTrigger struct {
const (
GlobalArduinoPrefix = "/arduino/s3-importer"

// Compatibility parameters for backward compatibility
// Parameters for backward compatibility
IoTApiKey = GlobalArduinoPrefix + "/iot/api-key"
IoTApiSecret = GlobalArduinoPrefix + "/iot/api-secret"
IoTApiOrgId = GlobalArduinoPrefix + "/iot/org-id"
Expand Down Expand Up @@ -190,7 +190,8 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err

err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression, enableAlignTimeWindow)
if err != nil {
return nil, err
message := "Error detected during data export"
return &message, err
}

message := "Data exported successfully"
Expand Down

0 comments on commit cebb158

Please sign in to comment.