Skip to content

Commit

Permalink
query with sql
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 21, 2023
1 parent ae6c008 commit 476be4e
Show file tree
Hide file tree
Showing 12 changed files with 533 additions and 419 deletions.
32 changes: 20 additions & 12 deletions base/commands/migration/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,32 @@ import (
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type MigrationStatus struct {
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
type MigrationStatusTotal struct {
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
Migrations []MigrationStatusRow `json:"migrations"`
}

type UpdateMessage struct {
type DataStructureInfo struct {
Name string
Type string
}

type MigrationStatusRow struct {
Name string `json:"name"`
Type string `json:"type"`
Status Status `json:"status"`
CompletionPercentage float32 `json:"completionPercentage"`
Message string `json:"message"`
CompletionPercentage float32 `json:"completion_percentage"`
Error string `json:"error"`
}

var ErrInvalidStatus = errors.New("invalid status value")

func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map) (*MigrationStatus, error) {
v, err := statusMap.Get(ctx, StatusMapEntryName)
func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map, migrationID string) (*MigrationStatusTotal, error) {
v, err := statusMap.Get(ctx, migrationID) //TODO: read only status with sql
if err != nil {
return nil, fmt.Errorf("getting status: %w", err)
}
Expand All @@ -44,7 +52,7 @@ func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map) (*Migrat
} else {
return nil, ErrInvalidStatus
}
var ms MigrationStatus
var ms MigrationStatusTotal
if err := json.Unmarshal(b, &ms); err != nil {
return nil, fmt.Errorf("parsing migration status: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ package migration
const (
StartQueueName = "__datamigration_start_queue"
StatusMapEntryName = "status"
StatusMapPrefix = "__datamigration_"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string

const (
StatusStarted Status = "STARTED"
Canceling Status = "CANCELING"
StatusCanceling Status = "CANCELING"
StatusComplete Status = "COMPLETED"
StatusCanceled Status = "CANCELED"
StatusFailed Status = "FAILED"
Expand Down
184 changes: 184 additions & 0 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//go:build std || migration

package migration

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
errors2 "github.com/hazelcast/hazelcast-commandline-client/errors"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"golang.org/x/exp/slices"
)

func migrationStages(ctx context.Context, ec plug.ExecContext, migrationID, reportOutputDir string, statusMap *hazelcast.Map) ([]stage.Stage[any], error) {
ci, err := ec.ClientInternal(ctx)
if err != nil {
return nil, err
}
if err = waitForMigrationToBeCreated(ctx, ci, migrationID); err != nil {
return nil, err
}
var stages []stage.Stage[any]
dss, err := dataStructuresToBeMigrated(ctx, ec, migrationID)
if err != nil {
return nil, err
}
for i, d := range dss {
i := i
stages = append(stages, stage.Stage[any]{
ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name),
SuccessMsg: fmt.Sprintf("Migrated %s: %s ...", d.Type, d.Name),
FailureMsg: fmt.Sprintf("Failed migrating %s: %s ...", d.Type, d.Name),
Func: func(ct context.Context, status stage.Statuser[any]) (any, error) {
for {
if ctx.Err() != nil {
return nil, err
}
generalStatus, err := readMigrationStatus(ctx, statusMap, migrationID)
if err != nil {
return nil, err
}
if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, generalStatus.Status) {
err = saveMemberLogs(ctx, ec, ci, migrationID)
if err != nil {
return nil, err
}
var name string
if reportOutputDir == "" {
name = fmt.Sprintf("migration_report_%s.txt", migrationID)
}
err = saveReportToFile(name, generalStatus.Report)
if err != nil {
return nil, err
}
}
switch generalStatus.Status {
case StatusComplete:
return nil, nil
case StatusFailed:
return nil, errors.New(generalStatus.Errors[0]) //TODO
case StatusCanceled, StatusCanceling:
return nil, errors2.ErrUserCancelled
}
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return nil, err
}
iter, err := res.Iterator()
if err != nil {
return nil, err
}
if iter.HasNext() {
row, err := iter.Next()
if err != nil {
return nil, err
}
rowStr, err := row.Get(0)
if err != nil {
return nil, err
}
var m MigrationStatusRow
if err = json.Unmarshal(rowStr.(serialization.JSON), &m); err != nil {
return nil, err
}
status.SetProgress(m.CompletionPercentage)
switch m.Status {
case StatusComplete:
return nil, nil
case StatusFailed:
return nil, stage.IgnoreError(errors.New(m.Error))
case StatusCanceled:
return nil, errors2.ErrUserCancelled
}
}
}
},
})
}
return stages, nil
}

func dataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, migrationID string) ([]DataStructureInfo, error) {
var dss []DataStructureInfo
ci, err := ec.ClientInternal(ctx)
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return nil, err
}
it, err := res.Iterator()
if err != nil {
return nil, err
}
if it.HasNext() {
row, err := it.Next()
if err != nil {
return nil, err
}
r, err := row.Get(0)
var status MigrationStatusTotal
if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil {
return nil, err
}
for _, m := range status.Migrations {
dss = append(dss, DataStructureInfo{
Name: m.Name,
Type: m.Type,
})
}
}
return dss, nil
}

func saveMemberLogs(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) error {
for _, m := range ci.OrderedMembers() {
l, err := ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String())
if err != nil {
return err
}
logs, err := l.GetAll(ctx)
if err != nil {
return err
}
for _, line := range logs {
ec.Logger().Debugf(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string)))
}
}
return nil
}

func saveReportToFile(fileName, report string) error {
f, err := os.Create(fmt.Sprintf(fileName))
if err != nil {
return err
}
defer f.Close()
return os.WriteFile(fileName, []byte(report), 0600)
}

func waitForMigrationToBeCreated(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error {
for {
statusMap, err := ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return err
}
ok, err := statusMap.ContainsKey(ctx, migrationID)
if err != nil {
return err
}
if ok {
return nil
}
}
}
19 changes: 11 additions & 8 deletions base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-commandline-client/internal/prompt"
"github.com/hazelcast/hazelcast-go-client"
)

type StartCmd struct{}
Expand Down Expand Up @@ -47,17 +46,21 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
var updateTopic *hazelcast.Topic
sts := NewStartStages(ec.Logger(), updateTopic, MakeMigrationID(), ec.GetStringArg(argDMTConfig), ec.Props().GetString(flagOutputDir))
if !sts.topicListenerID.Default() && sts.updateTopic != nil {
if err := sts.updateTopic.RemoveListener(ctx, sts.topicListenerID); err != nil {
return err
}
}
mID := MakeMigrationID()
sts := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig))
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if _, err := stage.Execute(ctx, ec, any(nil), sp); err != nil {
return err
}
mStages, err := migrationStages(ctx, ec, mID, ec.Props().GetString(flagOutputDir), sts.statusMap)
if err != nil {
return err
}
mp := stage.NewFixedProvider(mStages...)
if _, err := stage.Execute(ctx, ec, any(nil), mp); err != nil {
return err
}

ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary("OK Migration completed successfully.")
return nil
Expand Down
Loading

0 comments on commit 476be4e

Please sign in to comment.