Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 7, 2023
1 parent 8b68b51 commit 3f76b0f
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 139 deletions.
65 changes: 65 additions & 0 deletions base/commands/migration/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package migration

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type MigrationStatus struct {
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
Migrations []Migration `json:"migrations"`
}

type Migration struct {
Name string `json:"name"`
Type string `json:"type"`
Status Status `json:"status"`
StartTimestamp time.Time `json:"startTimestamp"`
EntriesMigrated int `json:"entriesMigrated"`
TotalEntries int `json:"totalEntries"`
CompletionPercentage float64 `json:"completionPercentage"`
}

var migrationStatusNone = MigrationStatus{
Status: StatusNone,
Logs: nil,
Errors: nil,
Report: "",
}

type UpdateMessage struct {
Status Status `json:"status"`
CompletionPercentage float32 `json:"completionPercentage"`
Message string `json:"message"`
}

func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map) (MigrationStatus, error) {
v, err := statusMap.Get(ctx, StatusMapEntryName)
if err != nil {
return migrationStatusNone, err
}
if v == nil {
return migrationStatusNone, nil
}
var b []byte
if vv, ok := v.(string); ok {
b = []byte(vv)
} else if vv, ok := v.(serialization.JSON); ok {
b = vv
} else {
return migrationStatusNone, fmt.Errorf("invalid status value")
}
var ms MigrationStatus
if err := json.Unmarshal(b, &ms); err != nil {
return migrationStatusNone, fmt.Errorf("unmarshaling status: %w", err)
}
return ms, nil
}
12 changes: 11 additions & 1 deletion base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ package migration
const (
StartQueueName = "__datamigration_start_queue"
StatusMapEntryName = "status"
updateTopic = "__datamigration_updates_"
UpdateTopic = "__datamigration_updates_"
MigrationsInProgressList = "__datamigrations_in_progress"
)

type Status string

const (
StatusNone Status = ""
StatusComplete Status = "COMPLETED"
StatusCanceled Status = "CANCELED"
StatusFailed Status = "FAILED"
StatusInProgress Status = "IN_PROGRESS"
)
2 changes: 2 additions & 0 deletions base/commands/migration/dummy.go
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
package migration

// This file exists only for compilation
109 changes: 21 additions & 88 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,18 @@ func (st *StartStages) topicListener(event *hazelcast.MessagePublished) {

func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
return func(stage.Statuser) error {
if err := st.statusMap.Delete(ctx, StatusMapEntryName); err != nil {
return err
}
var cb ConfigBundle
cb.MigrationID = st.migrationID
if err := cb.Walk(st.configDir); err != nil {
return err
}
b, err := json.Marshal(cb)
cb, err := makeConfigBundle(st.configDir, st.migrationID)
if err != nil {
return err
}
if err = st.startQueue.Put(ctx, serialization.JSON(b)); err != nil {
if err = st.startQueue.Put(ctx, cb); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
msg := <-st.updateMessageChan // read the first message
if slices.Contains([]status{StatusComplete, StatusFailed, statusCanceled}, msg.Status) {
ms, err := st.readMigrationStatus(ctx)
if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) {
ms, err := readMigrationStatus(ctx, st.statusMap)
if ctx.Err() != nil {
if errors.Is(err, context.DeadlineExceeded) {
return timeoutErr
Expand All @@ -134,7 +126,7 @@ func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func
switch ms.Status {
case StatusComplete:
return nil
case statusCanceled:
case StatusCanceled:
return clcerrors.ErrUserCancelled
case StatusFailed:
return fmt.Errorf("migration failed with following error(s): %s", strings.Join(ms.Errors, "\n"))
Expand All @@ -146,14 +138,27 @@ func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func
}
}

func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) {
var cb ConfigBundle
cb.MigrationID = migrationID
if err := cb.Walk(configDir); err != nil {
return nil, err
}
b, err := json.Marshal(cb)
if err != nil {
return nil, err
}
return b, nil
}

func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
for {
select {
case msg := <-st.updateMessageChan:
if slices.Contains([]status{StatusComplete, StatusFailed, statusCanceled}, msg.Status) {
ms, err := st.readMigrationStatus(ctx)
if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) {
ms, err := readMigrationStatus(ctx, st.statusMap)
if err != nil {
return fmt.Errorf("reading status: %w", err)
}
Expand All @@ -165,7 +170,7 @@ func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) fu
switch ms.Status {
case StatusComplete:
return nil
case statusCanceled:
case StatusCanceled:
return clcerrors.ErrUserCancelled
case StatusFailed:
return fmt.Errorf("migration failed with following error(s): %s", strings.Join(ms.Errors, "\n"))
Expand All @@ -184,75 +189,3 @@ func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) fu
}
}
}

func (st *StartStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) {
v, err := st.statusMap.Get(ctx, StatusMapEntryName)
if err != nil {
return migrationStatusNone, err
}
if v == nil {
return migrationStatusNone, nil
}
var b []byte
if vv, ok := v.(string); ok {
b = []byte(vv)
} else if vv, ok := v.(serialization.JSON); ok {
b = vv
} else {
return migrationStatusNone, fmt.Errorf("invalid status value")
}
var ms MigrationStatus
if err := json.Unmarshal(b, &ms); err != nil {
return migrationStatusNone, fmt.Errorf("unmarshaling status: %w", err)
}
return ms, nil
}

func MakeStatusMapName(migrationID string) string {
return "__datamigration_" + migrationID
}

func MakeUpdateTopicName(migrationID string) string {
return updateTopic + migrationID
}

type status string

const (
statusNone status = ""
StatusComplete status = "COMPLETED"
statusCanceled status = "CANCELED"
StatusFailed status = "FAILED"
StatusInProgress status = "IN_PROGRESS"
)

type MigrationStatus struct {
Status status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
Migrations []Migration `json:"migrations"`
}

type Migration struct {
Name string `json:"name"`
Type string `json:"type"`
Status status `json:"status"`
StartTimestamp time.Time `json:"startTimestamp"`
EntriesMigrated int `json:"entriesMigrated"`
TotalEntries int `json:"totalEntries"`
CompletionPercentage float64 `json:"completionPercentage"`
}

var migrationStatusNone = MigrationStatus{
Status: statusNone,
Logs: nil,
Errors: nil,
Report: "",
}

type UpdateMessage struct {
Status status `json:"status"`
CompletionPercentage float32 `json:"completionPercentage"`
Message string `json:"message"`
}
29 changes: 2 additions & 27 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package migration

import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
serialization2 "github.com/hazelcast/hazelcast-commandline-client/internal/serialization"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/types"
)

Expand Down Expand Up @@ -88,11 +86,11 @@ func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) fun
for {
select {
case msg := <-st.updateMessageChan:
ms, err := st.readMigrationStatus(ctx)
ms, err := readMigrationStatus(ctx, st.statusMap)
if err != nil {
return fmt.Errorf("reading status: %w", err)
}
if slices.Contains([]status{StatusComplete, StatusFailed, statusCanceled}, msg.Status) {
if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) {
ec.PrintlnUnnecessary(msg.Message)
ec.PrintlnUnnecessary(ms.Report)
if len(ms.Errors) > 0 {
Expand Down Expand Up @@ -153,26 +151,3 @@ func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) fun
func (st *StatusStages) topicListener(event *hazelcast.MessagePublished) {
st.updateMessageChan <- event.Value.(UpdateMessage)
}

func (st *StatusStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) {
v, err := st.statusMap.Get(ctx, StatusMapEntryName)
if err != nil {
return migrationStatusNone, err
}
if v == nil {
return migrationStatusNone, nil
}
var b []byte
if vv, ok := v.(string); ok {
b = []byte(vv)
} else if vv, ok := v.(serialization.JSON); ok {
b = vv
} else {
return migrationStatusNone, fmt.Errorf("invalid status value")
}
var ms MigrationStatus
if err := json.Unmarshal(b, &ms); err != nil {
return migrationStatusNone, fmt.Errorf("unmarshaling status: %w", err)
}
return ms, nil
}
36 changes: 13 additions & 23 deletions base/commands/migration/status_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -70,41 +71,30 @@ func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) stri
}

func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) {
m := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID)))
t := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID)))
setState(ctx, t, m, migration.StatusInProgress, "first message")
setState(ctx, t, m, migration.StatusFailed, "last message")

}

func setState(ctx context.Context, updateTopic *hazelcast.Topic, statusMap *hazelcast.Map, status migration.Status, msg string) {
startTime := MustValue(time.Parse(time.RFC3339, "2023-01-01T00:00:00Z"))
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID)))
b := MustValue(json.Marshal(migration.MigrationStatus{
Status: migration.StatusInProgress,
Status: status,
Report: "status report",
Migrations: []migration.Migration{
{
Name: "imap5",
Type: "IMap",
Status: migration.StatusInProgress,
StartTimestamp: startTime,
EntriesMigrated: 121,
TotalEntries: 1000,
CompletionPercentage: 12.1,
},
},
}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b)))
topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID)))
Must(topic.Publish(ctx, migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message"}))
b = MustValue(json.Marshal(migration.MigrationStatus{
Status: migration.StatusFailed,
Report: "status report",
Migrations: []migration.Migration{
{
Name: "imap5",
Type: "IMap",
Status: migration.StatusFailed,
Status: status,
StartTimestamp: startTime,
EntriesMigrated: 141,
TotalEntries: 1000,
CompletionPercentage: 14.1,
},
},
}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b))) // update status map
Must(topic.Publish(ctx, migration.UpdateMessage{Status: migration.StatusFailed, Message: "last message"}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b)))
Must(updateTopic.Publish(ctx, migration.UpdateMessage{Status: status, Message: msg}))
}
8 changes: 8 additions & 0 deletions base/commands/migration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,11 @@ func readPathAsString(path string) (string, error) {
func MakeMigrationID() string {
return types.NewUUID().String()
}

func MakeStatusMapName(migrationID string) string {
return "__datamigration_" + migrationID
}

func MakeUpdateTopicName(migrationID string) string {
return UpdateTopic + migrationID
}

0 comments on commit 3f76b0f

Please sign in to comment.