Skip to content

Commit

Permalink
[CLC-311]: Add status command to DMT
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 7, 2023
1 parent f79befd commit 8b68b51
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 18 deletions.
7 changes: 4 additions & 3 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package migration

const (
StartQueueName = "__datamigration_start_queue"
StatusMapEntryName = "status"
updateTopic = "__datamigration_updates_"
StartQueueName = "__datamigration_start_queue"
StatusMapEntryName = "status"
updateTopic = "__datamigration_updates_"
MigrationsInProgressList = "__datamigrations_in_progress"
)
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
sts := NewStages(makeMigrationID(), ec.Args()[0])
sts := NewStartStages(MakeMigrationID(), ec.Args()[0])
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
Expand Down
41 changes: 41 additions & 0 deletions base/commands/migration/migration_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package migration

import (
"context"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
)

type StatusCmd struct{}

func (s StatusCmd) Unwrappable() {}

func (s StatusCmd) Init(cc plug.InitContext) error {
cc.SetCommandUsage("status [flags]")
cc.SetCommandGroup("migration")
help := "Get status of the data migration"
cc.SetCommandHelp(help, help)
cc.SetPositionalArgCount(0, 0)
return nil
}

func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) error {
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0
(c) 2023 Hazelcast, Inc.
`)
sts := NewStatusStages()
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
}
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary("OK")
return nil
}

func init() {
check.Must(plug.Registry.RegisterCommand("status", &StatusCmd{}))
}
37 changes: 24 additions & 13 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
)

type Stages struct {
type StartStages struct {
migrationID string
configDir string
ci *hazelcast.ClientInternal
Expand All @@ -33,17 +33,17 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w",
context.DeadlineExceeded)

func NewStages(migrationID, configDir string) *Stages {
func NewStartStages(migrationID, configDir string) *StartStages {
if migrationID == "" {
panic("migrationID is required")
}
return &Stages{
return &StartStages{
migrationID: migrationID,
configDir: configDir,
}
}

func (st *Stages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
return []stage.Stage{
{
ProgressMsg: "Connecting to the migration cluster",
Expand All @@ -66,7 +66,7 @@ func (st *Stages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage
}
}

func (st *Stages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
func (st *StartStages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
return func(status stage.Statuser) error {
var err error
st.ci, err = ec.ClientInternal(ctx)
Expand All @@ -91,11 +91,11 @@ func (st *Stages) connectStage(ctx context.Context, ec plug.ExecContext) func(st
}
}

func (st *Stages) topicListener(event *hazelcast.MessagePublished) {
func (st *StartStages) topicListener(event *hazelcast.MessagePublished) {
st.updateMessageChan <- event.Value.(UpdateMessage)
}

func (st *Stages) startStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
return func(stage.Statuser) error {
if err := st.statusMap.Delete(ctx, StatusMapEntryName); err != nil {
return err
Expand Down Expand Up @@ -146,7 +146,7 @@ func (st *Stages) startStage(ctx context.Context, ec plug.ExecContext) func(stag
}
}

func (st *Stages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
for {
Expand Down Expand Up @@ -185,7 +185,7 @@ func (st *Stages) migrateStage(ctx context.Context, ec plug.ExecContext) func(st
}
}

func (st *Stages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) {
func (st *StartStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) {
v, err := st.statusMap.Get(ctx, StatusMapEntryName)
if err != nil {
return migrationStatusNone, err
Expand Down Expand Up @@ -227,10 +227,21 @@ const (
)

type MigrationStatus struct {
Status status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
Status status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
Migrations []Migration `json:"migrations"`
}

type Migration struct {
Name string `json:"name"`
Type string `json:"type"`
Status status `json:"status"`
StartTimestamp time.Time `json:"startTimestamp"`
EntriesMigrated int `json:"entriesMigrated"`
TotalEntries int `json:"totalEntries"`
CompletionPercentage float64 `json:"completionPercentage"`
}

var migrationStatusNone = MigrationStatus{
Expand Down
178 changes: 178 additions & 0 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package migration

import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/output"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
serialization2 "github.com/hazelcast/hazelcast-commandline-client/internal/serialization"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/types"
)

type StatusStages struct {
migrationID string
ci *hazelcast.ClientInternal
migrationsInProgressList *hazelcast.List
statusMap *hazelcast.Map
updateTopic *hazelcast.Topic
topicListenerID types.UUID
updateMessageChan chan UpdateMessage
}

func NewStatusStages() *StatusStages {
return &StatusStages{}
}

func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
return []stage.Stage{
{
ProgressMsg: "Connecting to the migration cluster",
SuccessMsg: "Connected to the migration cluster",
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ctx, ec),
},
{
ProgressMsg: "Fetching migration status",
SuccessMsg: "Fetched migration status",
FailureMsg: "Could not fetch migration status",
Func: st.fetchStage(ctx, ec),
},
}
}

type MigrationInProgress struct {
MigrationID string `json:"migrationId"`
}

func (st *StatusStages) connectStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(status stage.Statuser) error {
var err error
st.ci, err = ec.ClientInternal(ctx)
if err != nil {
return err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return err
}
m := all[0].(MigrationInProgress)
st.migrationID = m.MigrationID
st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID))
if err != nil {
return err
}
st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID))
if err != nil {
return err
}
st.updateMessageChan = make(chan UpdateMessage)
_, err = st.updateTopic.AddMessageListener(ctx, st.topicListener)
return err
}
}

func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
for {
select {
case msg := <-st.updateMessageChan:
ms, err := st.readMigrationStatus(ctx)
if err != nil {
return fmt.Errorf("reading status: %w", err)
}
if slices.Contains([]status{StatusComplete, StatusFailed, statusCanceled}, msg.Status) {
ec.PrintlnUnnecessary(msg.Message)
ec.PrintlnUnnecessary(ms.Report)
if len(ms.Errors) > 0 {
ec.PrintlnUnnecessary(fmt.Sprintf("migration failed with following error(s): %s", strings.Join(ms.Errors, "\n")))
}
if len(ms.Migrations) > 0 {
var rows []output.Row
for _, m := range ms.Migrations {
rows = append(rows, output.Row{
output.Column{
Name: "Name",
Type: serialization2.TypeString,
Value: m.Name,
},
output.Column{
Name: "Type",
Type: serialization2.TypeString,
Value: m.Type,
},
output.Column{
Name: "Status",
Type: serialization2.TypeString,
Value: string(m.Status),
},
output.Column{
Name: "Start Timestamp",
Type: serialization2.TypeJavaLocalDateTime,
Value: types.LocalDateTime(m.StartTimestamp),
},
output.Column{
Name: "Entries Migrated",
Type: serialization2.TypeInt32,
Value: int32(m.EntriesMigrated),
},
output.Column{
Name: "Total Entries",
Type: serialization2.TypeInt32,
Value: int32(m.TotalEntries),
},
output.Column{
Name: "Completion Percentage",
Type: serialization2.TypeFloat32,
Value: float32(m.CompletionPercentage),
},
})
}
return ec.AddOutputRows(ctx, rows...)
}
return nil
} else {
ec.PrintlnUnnecessary(msg.Message)
}
}
}
}
}

func (st *StatusStages) topicListener(event *hazelcast.MessagePublished) {
st.updateMessageChan <- event.Value.(UpdateMessage)
}

func (st *StatusStages) readMigrationStatus(ctx context.Context) (MigrationStatus, error) {
v, err := st.statusMap.Get(ctx, StatusMapEntryName)
if err != nil {
return migrationStatusNone, err
}
if v == nil {
return migrationStatusNone, nil
}
var b []byte
if vv, ok := v.(string); ok {
b = []byte(vv)
} else if vv, ok := v.(serialization.JSON); ok {
b = vv
} else {
return migrationStatusNone, fmt.Errorf("invalid status value")
}
var ms MigrationStatus
if err := json.Unmarshal(b, &ms); err != nil {
return migrationStatusNone, fmt.Errorf("unmarshaling status: %w", err)
}
return ms, nil
}
Loading

0 comments on commit 8b68b51

Please sign in to comment.