Skip to content

Commit

Permalink
feat: initial max2max plugin (#32)
Browse files Browse the repository at this point in the history
* feat: initial max2max poc

* chore: use amd64 alpine for max2max

* feat: use service account format for odps client

* restructure: max2max project

* chore: setup github workflow for max2max
  • Loading branch information
deryrahman authored Sep 25, 2024
1 parent 64a2bb8 commit 2bb3887
Show file tree
Hide file tree
Showing 19 changed files with 687 additions and 13 deletions.
21 changes: 11 additions & 10 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ on:
workflow_dispatch:

jobs:
build:
build-max2max:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v2
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.20'
- name: Run GoReleaser for branch [Main]
uses: goreleaser/[email protected]
with:
distribution: goreleaser
version: latest
args: -f .goreleaser.yml --snapshot --clean
go-version: 1.22
# Build the Go binary
- name: Build
run: |
cd max2max
mkdir build
go get .
env GOOS=linux GOARCH=amd64 go build -o ./build/max2max .
48 changes: 47 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
tags:
- 'bq2bq/v*'
- 'max2max/v*'

jobs:
bq2bq:
Expand Down Expand Up @@ -39,4 +40,49 @@ jobs:
docker.io/gotocompany/optimus-task-bq2bq-executor:latest
docker.io/gotocompany/optimus-task-bq2bq-executor:${{ steps.vars.outputs.tag }}
- name: Log out from Docker Hub
run: docker logout
run: docker logout
max2max:
if: startsWith(github.ref, 'refs/tags/max2max/v')
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22
# Build the Go binary
- name: Build
run: |
cd max2max
mkdir build
go get .
env GOOS=linux GOARCH=amd64 go build -o ./build/max2max .
- name: Login to DockerHub
uses: docker/login-action@v1
with:
registry: docker.io
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
# Extract the Git tag version
- name: Extract tag version
id: vars
run: |
# Extract the tag name from GITHUB_REF, remove 'refs/tags/max2max/' prefix
TAG="${GITHUB_REF#refs/tags/max2max/}"
echo "Tag name: $TAG"
echo "::set-output name=tag::$TAG"
# Build and push the Docker image to Docker Hub
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./max2max
platforms: linux/amd64 # Specify the target platforms
push: true
tags: |
docker.io/gotocompany/max2max:latest
docker.io/gotocompany/max2max:${{ steps.vars.outputs.tag }}
- name: Log out from Docker Hub
run: docker logout
21 changes: 19 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
workflow_dispatch:

jobs:
executor_bq2bq:
bq2bq:
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand All @@ -23,4 +23,21 @@ jobs:
run: |
cd ./bq2bq
chmod +x ./run_coverage.sh
./run_coverage.sh
./run_coverage.sh
max2max:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22
# Build the Go binary
- name: Build
run: |
cd max2max
go get .
go test ./...
3 changes: 3 additions & 0 deletions max2max/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/
dist/
.env
6 changes: 6 additions & 0 deletions max2max/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM amd64/alpine:3

RUN apk --no-cache add tzdata
COPY ./build/max2max /usr/local/bin/max2max

ENTRYPOINT ["/usr/local/bin/max2max"]
19 changes: 19 additions & 0 deletions max2max/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/goto/maxcompute-transformation

go 1.22.3

require github.com/aliyun/aliyun-odps-go-sdk v0.3.4

require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
272 changes: 272 additions & 0 deletions max2max/go.sum

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions max2max/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package client

import (
"fmt"
"log/slog"
"os"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type Loader interface {
GetQuery(tableID, query string) string
}

type client struct {
logger *slog.Logger
odpsIns *odps.Odps
}

func NewClient(logger *slog.Logger, odpsIns *odps.Odps) *client {
return &client{
logger: logger,
odpsIns: odpsIns,
}
}

func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {
// read query from filepath
c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath))
queryRaw, err := os.ReadFile(queryFilePath)
if err != nil {
return err
}

// execute query with odps client
c.logger.Info(fmt.Sprintf("execute: %s", string(queryRaw)))
ins, err := c.odpsIns.ExecSQl(loader.GetQuery(tableID, string(queryRaw)))
if err != nil {
return err
}
c.logger.Info(fmt.Sprintf("taskId: %s", ins.Id()))

// wait execution success
if err := ins.WaitForSuccess(); err != nil {
return err
}
c.logger.Info("execution done")
return nil
}
56 changes: 56 additions & 0 deletions max2max/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package config

import (
"encoding/json"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type Config struct {
*odps.Config
LogLevel string
LoadMethod string
QueryFilePath string
DestinationTableID string
}

type maxComputeCredentials struct {
AccessId string `json:"access_id"`
AccessKey string `json:"access_key"`
Endpoint string `json:"endpoint"`
ProjectName string `json:"project_name"`
}

func NewConfig() (*Config, error) {
cfg := &Config{
Config: odps.NewConfig(),
// max2max related config
LogLevel: getEnv("LOG_LEVEL", "INFO"),
LoadMethod: getEnv("LOAD_METHOD", "APPEND"),
QueryFilePath: getEnv("QUERY_FILE_PATH", ""),
DestinationTableID: getEnv("DESTINATION_TABLE_ID", ""),
}
// ali-odps-go-sdk related config
scvAcc := getEnv("SERVICE_ACCOUNT", "")
cred, err := collectMaxComputeCredential([]byte(scvAcc))
if err != nil {
return nil, err
}
cfg.Config.AccessId = cred.AccessId
cfg.Config.AccessKey = cred.AccessKey
cfg.Config.Endpoint = cred.Endpoint
cfg.Config.ProjectName = cred.ProjectName
cfg.Config.HttpTimeout = getEnvDuration("MAXCOMPUTE_HTTP_TIMEOUT", "10s")
cfg.Config.TcpConnectionTimeout = getEnvDuration("MAXCOMPUTE_TCP_TIMEOUT", "30s")

return cfg, nil
}

func collectMaxComputeCredential(scvAcc []byte) (*maxComputeCredentials, error) {
var creds maxComputeCredentials
if err := json.Unmarshal(scvAcc, &creds); err != nil {
return nil, err
}

return &creds, nil
}
18 changes: 18 additions & 0 deletions max2max/internal/config/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package config

import (
"os"
"time"
)

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

func getEnvDuration(key, fallback string) time.Duration {
result, _ := time.ParseDuration(getEnv(key, fallback))
return result
}
20 changes: 20 additions & 0 deletions max2max/internal/loader/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package loader

import (
"fmt"
"log/slog"
)

type appendLoader struct {
logger *slog.Logger
}

func NewAppendLoader(logger *slog.Logger) *appendLoader {
return &appendLoader{
logger: logger,
}
}

func (l *appendLoader) GetQuery(tableID, query string) string {
return fmt.Sprintf("INSERT INTO TABLE %s %s", tableID, query)
}
9 changes: 9 additions & 0 deletions max2max/internal/loader/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package loader

const (
APPEND = "APPEND"
REPLACE = "REPLACE"
REPLACE_ALL = "REPLACE_ALL"
MERGE = "MERGE"
MERGE_REPLACE = "MERGE_REPLACE"
)
27 changes: 27 additions & 0 deletions max2max/internal/loader/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package loader

import (
"errors"
"log/slog"
)

type Loader interface {
GetQuery(tableID, query string) string
}

func GetLoader(name string, logger *slog.Logger) (Loader, error) {
switch name {
case APPEND:
return NewAppendLoader(logger), nil
case REPLACE:
return NewReplaceLoader(logger), nil
case REPLACE_ALL:
return NewReplaceAllLoader(logger), nil
case MERGE:
return NewMergeLoader(logger), nil
case MERGE_REPLACE:
return NewMergeReplaceLoader(logger), nil
default:
return nil, errors.New("loader not found")
}
}
19 changes: 19 additions & 0 deletions max2max/internal/loader/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type mergeLoader struct {
logger *slog.Logger
}

func NewMergeLoader(logger *slog.Logger) *mergeLoader {
return &mergeLoader{
logger: logger,
}
}

func (l *mergeLoader) GetQuery(tableID, query string) string {
return "-- TODO merge loader"
}
19 changes: 19 additions & 0 deletions max2max/internal/loader/merge_replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package loader

import (
"log/slog"
)

type mergeReplaceLoader struct {
logger *slog.Logger
}

func NewMergeReplaceLoader(logger *slog.Logger) *mergeReplaceLoader {
return &mergeReplaceLoader{
logger: logger,
}
}

func (l *mergeReplaceLoader) GetQuery(tableID, query string) string {
return "-- TODO merge replace loader"
}
Loading

0 comments on commit 2bb3887

Please sign in to comment.