Skip to content

Commit

Permalink
fix Yuce's refactor request
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Nov 6, 2023
1 parent 465a2e0 commit c53f607
Showing 1 changed file with 48 additions and 45 deletions.
93 changes: 48 additions & 45 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/sql"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors"
Expand All @@ -27,6 +28,8 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w

var migrationStatusNotFoundErr = fmt.Errorf("migration status not found")

var migrationReportNotFoundErr = "migration report cannot be found: %w"

var progressMsg = "Migrating %s: %s"

func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) {
Expand Down Expand Up @@ -147,8 +150,12 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
rr, err := r.Get(0)
if err != nil {
return nil, err
}
var status OverallMigrationStatus
if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil {
if err = json.Unmarshal(rr.(serialization.JSON), &status); err != nil {
return nil, err
}
if len(status.Migrations) == 0 {
Expand Down Expand Up @@ -243,70 +250,70 @@ func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, mig
if err != nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(r.(serialization.JSON)), `"`), `"`), nil
rr, err := r.Get(0)
if err != nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(rr.(serialization.JSON)), `"`), `"`), nil
}

func fetchOverallProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (time.Duration, float32, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.remainingTime'), JSON_QUERY(this, '$.completionPercentage') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return 0, 0, err
}
it, err := res.Iterator()
remainingTime, err := r.Get(0)
if err != nil {
return 0, 0, err
}
if it.HasNext() {
// single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return 0, 0, err
}
remainingTime, err := row.Get(0)
if err != nil {
return 0, 0, err
}
completionPercentage, err := row.Get(1)
if err != nil {
return 0, 0, err
}
if completionPercentage == nil {
return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName)
}
if remainingTime == nil {
return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName)
}
rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64)
if err != nil {
return 0, 0, err
}
cpStr := completionPercentage.(serialization.JSON).String()
cp, err := strconv.ParseFloat(cpStr, 32)
if err != nil {
return 0, 0, err
}
return time.Duration(rt) * time.Millisecond, float32(cp), nil
completionPercentage, err := r.Get(1)
if err != nil {
return 0, 0, err
}
if completionPercentage == nil {
return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName)
}
return 0, 0, errors.New("no rows found")
if remainingTime == nil {
return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName)
}
rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64)
if err != nil {
return 0, 0, err
}
cpStr := completionPercentage.(serialization.JSON).String()
cp, err := strconv.ParseFloat(cpStr, 32)
if err != nil {
return 0, 0, err
}
return time.Duration(rt) * time.Millisecond, float32(cp), nil
}

func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", fmt.Errorf("migration report cannot be found: %w", err)
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
return strings.ReplaceAll(string(r.(serialization.JSON)), `\"`, ``), nil
rr, err := r.Get(0)
if err != nil {
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
return strings.ReplaceAll(string(rr.(serialization.JSON)), `\"`, ``), nil
}

func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
row, err := querySingleRow(ctx, ci, q)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", err
}
rr, err := r.Get(0)
if err != nil {
return "", err
}
var errs []string
err = json.Unmarshal(row.(serialization.JSON), &errs)
err = json.Unmarshal(rr.(serialization.JSON), &errs)
if err != nil {
return "", err
}
Expand All @@ -326,7 +333,7 @@ func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.C
return nil
}

func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (any, error) {
func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (sql.Row, error) {
res, err := ci.Client().SQL().Execute(ctx, query)
if err != nil {
return nil, err
Expand All @@ -341,11 +348,7 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str
if err != nil {
return nil, err
}
r, err := row.Get(0)
if err != nil {
return "", err
}
return r, nil
return row, nil
}
return nil, errors.New("no rows found")
}

0 comments on commit c53f607

Please sign in to comment.