diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index ec63bd48..3f832219 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -1,7 +1,8 @@ package migration const ( - StartQueueName = "__datamigration_start_queue" - StatusMapEntryName = "status" - updateTopic = "__datamigration_updates_" + StartQueueName = "__datamigration_start_queue" + StatusMapEntryName = "status" + updateTopic = "__datamigration_updates_" + MigrationsInProgressList = "__datamigrations_in_progress" ) diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 95136cb9..988991d9 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -45,7 +45,7 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - sts := NewStages(makeMigrationID(), ec.Args()[0]) + sts := NewStartStages(MakeMigrationID(), ec.Args()[0]) sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) if err := stage.Execute(ctx, ec, sp); err != nil { return err diff --git a/base/commands/migration/migration_status.go b/base/commands/migration/migration_status.go new file mode 100644 index 00000000..964f11d7 --- /dev/null +++ b/base/commands/migration/migration_status.go @@ -0,0 +1,41 @@ +package migration + +import ( + "context" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type StatusCmd struct{} + +func (s StatusCmd) Unwrappable() {} + +func (s StatusCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("status [flags]") + cc.SetCommandGroup("migration") + help := "Get status of the data migration" + cc.SetCommandHelp(help, help) + cc.SetPositionalArgCount(0, 0) + return nil +} + +func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) error { + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 +(c) 2023 Hazelcast, Inc. +`) + sts := NewStatusStages() + sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) + if err := stage.Execute(ctx, ec, sp); err != nil { + return err + } + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("status", &StatusCmd{})) +} diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index 4d666919..cab07046 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -18,7 +18,7 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/internal/plug" ) -type Stages struct { +type StartStages struct { migrationID string configDir string ci *hazelcast.ClientInternal @@ -33,17 +33,17 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w "please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w", context.DeadlineExceeded) -func NewStages(migrationID, configDir string) *Stages { +func NewStartStages(migrationID, configDir string) *StartStages { if migrationID == "" { panic("migrationID is required") } - return &Stages{ + return &StartStages{ migrationID: migrationID, configDir: configDir, } } -func (st *Stages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage { +func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage { return []stage.Stage{ { ProgressMsg: "Connecting to the migration cluster", @@ -66,7 +66,7 @@ func (st *Stages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage } } -func (st *Stages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error { +func (st *StartStages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error { return func(status stage.Statuser) error { var err error st.ci, err = ec.ClientInternal(ctx) @@ -91,11 +91,11 @@ func (st *Stages) connectStage(ctx context.Context, ec plug.ExecContext) func(st } } -func (st *Stages) topicListener(event *hazelcast.MessagePublished) { +func (st *StartStages) topicListener(event *hazelcast.MessagePublished) { st.updateMessageChan <- event.Value.(UpdateMessage) } -func (st *Stages) startStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error { +func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error { return func(stage.Statuser) error { if err := st.statusMap.Delete(ctx, StatusMapEntryName); err != nil { return err @@ -146,7 +146,7 @@ func (st *Stages) startStage(ctx context.Context, ec plug.ExecContext) func(stag } } -func (st *Stages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error { +func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error { return func(stage.Statuser) error { defer st.updateTopic.RemoveListener(ctx, st.topicListenerID) for { @@ -185,7 +185,7 @@ func (st *Stages) migrateStage(ctx context.Context, ec plug.ExecContext) func(st } } -func (st *Stages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) { +func (st *StartStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) { v, err := st.statusMap.Get(ctx, StatusMapEntryName) if err != nil { return migrationStatusNone, err @@ -227,10 +227,21 @@ const ( ) type MigrationStatus struct { - Status status `json:"status"` - Logs []string `json:"logs"` - Errors []string `json:"errors"` - Report string `json:"report"` + Status status `json:"status"` + Logs []string `json:"logs"` + Errors []string `json:"errors"` + Report string `json:"report"` + Migrations []Migration `json:"migrations"` +} + +type Migration struct { + Name string `json:"name"` + Type string `json:"type"` + Status status `json:"status"` + StartTimestamp time.Time `json:"startTimestamp"` + EntriesMigrated int `json:"entriesMigrated"` + TotalEntries int `json:"totalEntries"` + CompletionPercentage float64 `json:"completionPercentage"` } var migrationStatusNone = MigrationStatus{ diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go new file mode 100644 index 00000000..5f158432 --- /dev/null +++ b/base/commands/migration/status_stages.go @@ -0,0 +1,178 @@ +package migration + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/output" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + serialization2 "github.com/hazelcast/hazelcast-commandline-client/internal/serialization" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/hazelcast/hazelcast-go-client/types" +) + +type StatusStages struct { + migrationID string + ci *hazelcast.ClientInternal + migrationsInProgressList *hazelcast.List + statusMap *hazelcast.Map + updateTopic *hazelcast.Topic + topicListenerID types.UUID + updateMessageChan chan UpdateMessage +} + +func NewStatusStages() *StatusStages { + return &StatusStages{} +} + +func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage { + return []stage.Stage{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: st.connectStage(ctx, ec), + }, + { + ProgressMsg: "Fetching migration status", + SuccessMsg: "Fetched migration status", + FailureMsg: "Could not fetch migration status", + Func: st.fetchStage(ctx, ec), + }, + } +} + +type MigrationInProgress struct { + MigrationID string `json:"migrationId"` +} + +func (st *StatusStages) connectStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error { + return func(status stage.Statuser) error { + var err error + st.ci, err = ec.ClientInternal(ctx) + if err != nil { + return err + } + st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) + if err != nil { + return err + } + all, err := st.migrationsInProgressList.GetAll(ctx) + if err != nil { + return err + } + m := all[0].(MigrationInProgress) + st.migrationID = m.MigrationID + st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID)) + if err != nil { + return err + } + st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID)) + if err != nil { + return err + } + st.updateMessageChan = make(chan UpdateMessage) + _, err = st.updateTopic.AddMessageListener(ctx, st.topicListener) + return err + } +} + +func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error { + return func(stage.Statuser) error { + defer st.updateTopic.RemoveListener(ctx, st.topicListenerID) + for { + select { + case msg := <-st.updateMessageChan: + ms, err := st.readMigrationStatus(ctx) + if err != nil { + return fmt.Errorf("reading status: %w", err) + } + if slices.Contains([]status{StatusComplete, StatusFailed, statusCanceled}, msg.Status) { + ec.PrintlnUnnecessary(msg.Message) + ec.PrintlnUnnecessary(ms.Report) + if len(ms.Errors) > 0 { + ec.PrintlnUnnecessary(fmt.Sprintf("migration failed with following error(s): %s", strings.Join(ms.Errors, "\n"))) + } + if len(ms.Migrations) > 0 { + var rows []output.Row + for _, m := range ms.Migrations { + rows = append(rows, output.Row{ + output.Column{ + Name: "Name", + Type: serialization2.TypeString, + Value: m.Name, + }, + output.Column{ + Name: "Type", + Type: serialization2.TypeString, + Value: m.Type, + }, + output.Column{ + Name: "Status", + Type: serialization2.TypeString, + Value: string(m.Status), + }, + output.Column{ + Name: "Start Timestamp", + Type: serialization2.TypeJavaLocalDateTime, + Value: types.LocalDateTime(m.StartTimestamp), + }, + output.Column{ + Name: "Entries Migrated", + Type: serialization2.TypeInt32, + Value: int32(m.EntriesMigrated), + }, + output.Column{ + Name: "Total Entries", + Type: serialization2.TypeInt32, + Value: int32(m.TotalEntries), + }, + output.Column{ + Name: "Completion Percentage", + Type: serialization2.TypeFloat32, + Value: float32(m.CompletionPercentage), + }, + }) + } + return ec.AddOutputRows(ctx, rows...) + } + return nil + } else { + ec.PrintlnUnnecessary(msg.Message) + } + } + } + } +} + +func (st *StatusStages) topicListener(event *hazelcast.MessagePublished) { + st.updateMessageChan <- event.Value.(UpdateMessage) +} + +func (st *StatusStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) { + v, err := st.statusMap.Get(ctx, StatusMapEntryName) + if err != nil { + return migrationStatusNone, err + } + if v == nil { + return migrationStatusNone, nil + } + var b []byte + if vv, ok := v.(string); ok { + b = []byte(vv) + } else if vv, ok := v.(serialization.JSON); ok { + b = vv + } else { + return migrationStatusNone, fmt.Errorf("invalid status value") + } + var ms MigrationStatus + if err := json.Unmarshal(b, &ms); err != nil { + return migrationStatusNone, fmt.Errorf("unmarshaling status: %w", err) + } + return ms, nil +} diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go new file mode 100644 index 00000000..67f6b8b5 --- /dev/null +++ b/base/commands/migration/status_stages_it_test.go @@ -0,0 +1,110 @@ +//go:build migration + +package migration_test + +import ( + "context" + "encoding/json" + "sync" + "testing" + "time" + + _ "github.com/hazelcast/hazelcast-commandline-client/base" + _ "github.com/hazelcast/hazelcast-commandline-client/base/commands" + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + . "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/stretchr/testify/require" +) + +func TestStatus(t *testing.T) { + testCases := []struct { + name string + f func(t *testing.T) + }{ + {name: "status", f: statusTest}, + } + for _, tc := range testCases { + t.Run(tc.name, tc.f) + } +} + +func statusTest(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + mID := preStatusRunner(t, tcx, ctx) + var wg sync.WaitGroup + wg.Add(1) + go tcx.WithReset(func() { + defer wg.Done() + Must(tcx.CLC().Execute(ctx, "status")) + }) + time.Sleep(1 * time.Second) // give time to status command to register its topic listener + statusRunner(mID, tcx, ctx) + wg.Wait() + tcx.AssertStdoutContains(` +Hazelcast Data Migration Tool v5.3.0 +(c) 2023 Hazelcast, Inc. + + OK [1/2] Connected to the migration cluster. +first message +last message +status report +imap5 IMap FAILED 2023-01-01 00:00:00 141 1000 14.1 + OK [2/2] Fetched migration status. + +OK`) + }) +} + +func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) string { + mID := migration.MakeMigrationID() + l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) + ok := MustValue(l.Add(ctx, migration.MigrationInProgress{ + MigrationID: mID, + })) + require.Equal(t, true, ok) + return mID +} + +func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) { + startTime := MustValue(time.Parse(time.RFC3339, "2023-01-01T00:00:00Z")) + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID))) + b := MustValue(json.Marshal(migration.MigrationStatus{ + Status: migration.StatusInProgress, + Report: "status report", + Migrations: []migration.Migration{ + { + Name: "imap5", + Type: "IMap", + Status: migration.StatusInProgress, + StartTimestamp: startTime, + EntriesMigrated: 121, + TotalEntries: 1000, + CompletionPercentage: 12.1, + }, + }, + })) + Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b))) + topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) + Must(topic.Publish(ctx, migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message"})) + b = MustValue(json.Marshal(migration.MigrationStatus{ + Status: migration.StatusFailed, + Report: "status report", + Migrations: []migration.Migration{ + { + Name: "imap5", + Type: "IMap", + Status: migration.StatusFailed, + StartTimestamp: startTime, + EntriesMigrated: 141, + TotalEntries: 1000, + CompletionPercentage: 14.1, + }, + }, + })) + Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b))) // update status map + Must(topic.Publish(ctx, migration.UpdateMessage{Status: migration.StatusFailed, Message: "last message"})) +} diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 699d4796..7b9df423 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -119,6 +119,6 @@ func readPathAsString(path string) (string, error) { return string(b), nil } -func makeMigrationID() string { +func MakeMigrationID() string { return types.NewUUID().String() }