Skip to content

Commit

Permalink
chore: error handling and graceful shutdown (#37)
Browse files Browse the repository at this point in the history
* chore: graceful shutdown

* chore: move entrypoint implementation to dedicated file

* chore: error handling

* fix: dynamic link

* feat: terminate the job as well when fail to fetch log
  • Loading branch information
deryrahman authored Oct 11, 2024
1 parent 2440607 commit 42a6804
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
cd max2max
mkdir build
go get .
env GOOS=linux GOARCH=amd64 go build -o ./build/max2max .
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/max2max .
- name: Login to DockerHub
uses: docker/login-action@v1
with:
Expand Down
40 changes: 23 additions & 17 deletions max2max/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package client

import (
"errors"
"context"
e "errors"
"fmt"
"log/slog"
"os"
"strings"

"github.com/pkg/errors"
)

type Loader interface {
Expand All @@ -14,65 +17,68 @@ type Loader interface {
}

type OdpsClient interface {
GetPartitionNames(tableID string) ([]string, error)
ExecSQL(query string) error
GetPartitionNames(ctx context.Context, tableID string) ([]string, error)
ExecSQL(ctx context.Context, query string) error
}

type Client struct {
OdpsClient OdpsClient
Loader Loader

appCtx context.Context
logger *slog.Logger
shutdownFns []func() error
}

func NewClient(setupFns ...SetupFn) (*Client, error) {
func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
c := &Client{
appCtx: ctx,
shutdownFns: make([]func() error, 0),
}
for _, setupFn := range setupFns {
if err := setupFn(c); err != nil {
return nil, err
return nil, errors.WithStack(err)
}
}
return c, nil
}

func (c *Client) Close() error {
c.logger.Info("closing client")
var err error
for _, fn := range c.shutdownFns {
err = errors.Join(err, fn())
err = e.Join(err, fn())
}
return err
return errors.WithStack(err)
}

func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error {
func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) error {
// read query from filepath
c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath))
queryRaw, err := os.ReadFile(queryFilePath)
if err != nil {
return err
return errors.WithStack(err)
}

// check if table is partitioned
c.logger.Info(fmt.Sprintf("checking if table %s is partitioned", tableID))
partitionNames, err := c.OdpsClient.GetPartitionNames(tableID)
partitionNames, err := c.OdpsClient.GetPartitionNames(ctx, tableID)
if err != nil {
return err
return errors.WithStack(err)
}

// prepare query
queryToExec := loader.GetQuery(tableID, string(queryRaw))
queryToExec := c.Loader.GetQuery(tableID, string(queryRaw))
if len(partitionNames) > 0 {
c.logger.Info(fmt.Sprintf("table %s is partitioned by %s", tableID, strings.Join(partitionNames, ", ")))
queryToExec = loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
queryToExec = c.Loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
}

// execute query with odps client
c.logger.Info(fmt.Sprintf("execute: %s", queryToExec))
if err := c.OdpsClient.ExecSQL(queryToExec); err != nil {
return err
if err := c.OdpsClient.ExecSQL(ctx, queryToExec); err != nil {
return errors.WithStack(err)
}

c.logger.Info("execution done")
return nil
return errors.WithStack(err)
}
47 changes: 11 additions & 36 deletions max2max/internal/client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client_test

import (
"context"
"fmt"
"os"
"testing"
Expand All @@ -13,17 +14,17 @@ import (
func TestExecute(t *testing.T) {
t.Run("should return error when reading query file fails", func(t *testing.T) {
// arrange
client, err := client.NewClient(client.SetupLogger("error"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{}
// act
err = client.Execute(nil, "", "./nonexistentfile")
err = client.Execute(context.TODO(), "", "./nonexistentfile")
// assert
assert.Error(t, err)
})
t.Run("should return error when getting partition name fails", func(t *testing.T) {
// arrange
client, err := client.NewClient(client.SetupLogger("error"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
Expand All @@ -32,14 +33,14 @@ func TestExecute(t *testing.T) {
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(nil, "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error get partition name")
})
t.Run("should return error when executing query fails", func(t *testing.T) {
// arrange
client, err := client.NewClient(client.SetupLogger("error"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("APPEND"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
Expand All @@ -49,21 +50,16 @@ func TestExecute(t *testing.T) {
return fmt.Errorf("error exec sql")
},
}
loader := &mockLoader{
getQueryResult: func() string {
return "INSERT INTO table SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(loader, "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error exec sql")
})
t.Run("should return nil when everything is successful", func(t *testing.T) {
// arrange
client, err := client.NewClient(client.SetupLogger("error"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("APPEND"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
Expand All @@ -73,17 +69,9 @@ func TestExecute(t *testing.T) {
return nil
},
}
loader := &mockLoader{
getQueryResult: func() string {
return "INSERT INTO table SELECT * FROM table;"
},
getPartitionedQueryResult: func() string {
return "INSERT INTO table PARTITION (event_date) SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(loader, "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
// assert
assert.NoError(t, err)
})
Expand All @@ -94,23 +82,10 @@ type mockOdpsClient struct {
execSQLResult func() error
}

func (m *mockOdpsClient) GetPartitionNames(tableID string) ([]string, error) {
func (m *mockOdpsClient) GetPartitionNames(ctx context.Context, tableID string) ([]string, error) {
return m.partitionResult()
}

func (m *mockOdpsClient) ExecSQL(query string) error {
func (m *mockOdpsClient) ExecSQL(ctx context.Context, query string) error {
return m.execSQLResult()
}

type mockLoader struct {
getQueryResult func() string
getPartitionedQueryResult func() string
}

func (m *mockLoader) GetQuery(tableID, query string) string {
return m.getQueryResult()
}

func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return m.getPartitionedQueryResult()
}
45 changes: 39 additions & 6 deletions max2max/internal/client/odps.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package client

import (
"context"
e "errors"
"fmt"
"log/slog"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/pkg/errors"
)

type odpsClient struct {
logger *slog.Logger
client *odps.Odps
}

// NewODPSClient creates a new odpsClient instance
func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
return &odpsClient{
logger: logger,
Expand All @@ -20,26 +24,55 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
}

// ExecSQL executes the given query in syncronous mode (blocking)
// TODO: change the execution mode to async and do graceful shutdown
func (c *odpsClient) ExecSQL(query string) error {
// with capability to do graceful shutdown by terminating task instance
// when context is cancelled.
func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
taskIns, err := c.client.ExecSQl(query)
if err != nil {
return err
return errors.WithStack(err)
}

// generate log view
url, err := odps.NewLogView(c.client).GenerateLogView(taskIns, 1)
if err != nil {
err = e.Join(err, taskIns.Terminate())
return errors.WithStack(err)
}
c.logger.Info(fmt.Sprintf("log view: %s", url))

// wait execution success
c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id()))
return taskIns.WaitForSuccess()
select {
case <-ctx.Done():
c.logger.Info("context cancelled, terminating task instance")
err := taskIns.Terminate()
return e.Join(ctx.Err(), err)
case err := <-wait(taskIns):
return errors.WithStack(err)
}
}

func (c *odpsClient) GetPartitionNames(tableID string) ([]string, error) {
// GetPartitionNames returns the partition names of the given table
// by querying the table schema.
func (c *odpsClient) GetPartitionNames(_ context.Context, tableID string) ([]string, error) {
table := c.client.Table(tableID)
if err := table.Load(); err != nil {
return nil, err
return nil, errors.WithStack(err)
}
var partitionNames []string
for _, partition := range table.Schema().PartitionColumns {
partitionNames = append(partitionNames, partition.Name)
}
return partitionNames, nil
}

// wait waits for the task instance to finish on a separate goroutine
func wait(taskIns *odps.Instance) <-chan error {
errChan := make(chan error)
go func(errChan chan<- error) {
defer close(errChan)
err := taskIns.WaitForSuccess()
errChan <- errors.WithStack(err)
}(errChan)
return errChan
}
8 changes: 4 additions & 4 deletions max2max/internal/client/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package client
import (
"context"

"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

func setupOTelSDK(collectorGRPCEndpoint string, jobName, scheduledTime string) (shutdown func() error, err error) {
ctx := context.Background() // TODO: use context from main
func setupOTelSDK(ctx context.Context, collectorGRPCEndpoint string, jobName, scheduledTime string) (shutdown func() error, err error) {
metricExporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(collectorGRPCEndpoint),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

// for now, we only need metric provider
Expand All @@ -33,6 +33,6 @@ func setupOTelSDK(collectorGRPCEndpoint string, jobName, scheduledTime string) (
otel.SetMeterProvider(meterProvider)

return func() error {
return meterProvider.Shutdown(ctx)
return meterProvider.Shutdown(context.Background())
}, nil
}
19 changes: 16 additions & 3 deletions max2max/internal/client/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package client

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/goto/transformers/max2max/internal/loader"
"github.com/goto/transformers/max2max/internal/logger"
"github.com/pkg/errors"
)

type SetupFn func(c *Client) error
Expand All @@ -11,7 +13,7 @@ func SetupLogger(logLevel string) SetupFn {
return func(c *Client) error {
logger, err := logger.NewLogger(logLevel)
if err != nil {
return err
return errors.WithStack(err)
}
c.logger = logger
return nil
Expand All @@ -30,11 +32,22 @@ func SetupOTelSDK(collectorGRPCEndpoint, jobName, scheduledTime string) SetupFn
if collectorGRPCEndpoint == "" {
return nil
}
shutdownFn, err := setupOTelSDK(collectorGRPCEndpoint, jobName, scheduledTime)
shutdownFn, err := setupOTelSDK(c.appCtx, collectorGRPCEndpoint, jobName, scheduledTime)
if err != nil {
return err
return errors.WithStack(err)
}
c.shutdownFns = append(c.shutdownFns, shutdownFn)
return nil
}
}

func SetupLoader(loadMethod string) SetupFn {
return func(c *Client) error {
loader, err := loader.GetLoader(loadMethod, c.logger)
if err != nil {
return errors.WithStack(err)
}
c.Loader = loader
return nil
}
}
Loading

0 comments on commit 42a6804

Please sign in to comment.