Skip to content

Commit

Permalink
feat: add load method with partition (#34)
Browse files Browse the repository at this point in the history
* feat: add load method with partition

* feat: support multi partition
  • Loading branch information
deryrahman authored Sep 26, 2024
1 parent 2bb3887 commit 5f3867f
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 11 deletions.
46 changes: 37 additions & 9 deletions max2max/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"fmt"
"log/slog"
"os"
"strings"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type Loader interface {
GetQuery(tableID, query string) string
GetPartitionedQuery(tableID, query string, partitionName []string) string
}

type client struct {
logger *slog.Logger
odpsIns *odps.Odps
logger *slog.Logger
odpsClient *odps.Odps
}

func NewClient(logger *slog.Logger, odpsIns *odps.Odps) *client {
func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *client {
return &client{
logger: logger,
odpsIns: odpsIns,
logger: logger,
odpsClient: odpsClient,
}
}

Expand All @@ -32,18 +34,44 @@ func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {
return err
}

// check if table is partitioned
c.logger.Info(fmt.Sprintf("checking if table %s is partitioned", tableID))
partitionNames, err := c.getPartitionNames(tableID)
if err != nil {
return err
}

// prepare query
queryToExec := loader.GetQuery(tableID, string(queryRaw))
if len(partitionNames) > 0 {
c.logger.Info(fmt.Sprintf("table %s is partitioned by %s", tableID, strings.Join(partitionNames, ", ")))
queryToExec = loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
}

// execute query with odps client
c.logger.Info(fmt.Sprintf("execute: %s", string(queryRaw)))
ins, err := c.odpsIns.ExecSQl(loader.GetQuery(tableID, string(queryRaw)))
c.logger.Info(fmt.Sprintf("execute: %s", queryToExec))
taskIns, err := c.odpsClient.ExecSQl(queryToExec)
if err != nil {
return err
}
c.logger.Info(fmt.Sprintf("taskId: %s", ins.Id()))

// wait execution success
if err := ins.WaitForSuccess(); err != nil {
c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id()))
if err := taskIns.WaitForSuccess(); err != nil {
return err
}
c.logger.Info("execution done")
return nil
}

func (c *client) getPartitionNames(tableID string) ([]string, error) {
table := c.odpsClient.Table(tableID)
if err := table.Load(); err != nil {
return nil, err
}
var partitionNames []string
for _, partition := range table.Schema().PartitionColumns {
partitionNames = append(partitionNames, partition.Name)
}
return partitionNames, nil
}
5 changes: 5 additions & 0 deletions max2max/internal/loader/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loader
import (
"fmt"
"log/slog"
"strings"
)

type appendLoader struct {
Expand All @@ -18,3 +19,7 @@ func NewAppendLoader(logger *slog.Logger) *appendLoader {
func (l *appendLoader) GetQuery(tableID, query string) string {
return fmt.Sprintf("INSERT INTO TABLE %s %s", tableID, query)
}

func (l *appendLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string {
return fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s", tableID, strings.Join(partitionNames, ", "), query)
}
5 changes: 3 additions & 2 deletions max2max/internal/loader/factory.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package loader

import (
"errors"
"fmt"
"log/slog"
)

type Loader interface {
GetQuery(tableID, query string) string
GetPartitionedQuery(tableID, query string, partitionName []string) string
}

func GetLoader(name string, logger *slog.Logger) (Loader, error) {
Expand All @@ -22,6 +23,6 @@ func GetLoader(name string, logger *slog.Logger) (Loader, error) {
case MERGE_REPLACE:
return NewMergeReplaceLoader(logger), nil
default:
return nil, errors.New("loader not found")
return nil, fmt.Errorf("loader %s not found", name)
}
}
4 changes: 4 additions & 0 deletions max2max/internal/loader/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func NewMergeLoader(logger *slog.Logger) *mergeLoader {
func (l *mergeLoader) GetQuery(tableID, query string) string {
return "-- TODO merge loader"
}

func (l *mergeLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return "-- TODO merge loader"
}
4 changes: 4 additions & 0 deletions max2max/internal/loader/merge_replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func NewMergeReplaceLoader(logger *slog.Logger) *mergeReplaceLoader {
func (l *mergeReplaceLoader) GetQuery(tableID, query string) string {
return "-- TODO merge replace loader"
}

func (l *mergeReplaceLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return "-- TODO merge replace loader"
}
4 changes: 4 additions & 0 deletions max2max/internal/loader/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func NewReplaceLoader(logger *slog.Logger) *replaceLoader {
func (l *replaceLoader) GetQuery(tableID, query string) string {
return "-- TODO replace loader"
}

func (l *replaceLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return "-- TODO replace loader"
}
4 changes: 4 additions & 0 deletions max2max/internal/loader/replace_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func NewReplaceAllLoader(logger *slog.Logger) *replaceAllLoader {
func (l *replaceAllLoader) GetQuery(tableID, query string) string {
return "-- TODO replace all loader"
}

func (l *replaceAllLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return "-- TODO replace all loader"
}

0 comments on commit 5f3867f

Please sign in to comment.