From 476be4e18f6ba53a59214b8b0cc6be359b58dd27 Mon Sep 17 00:00:00 2001 From: kmetin Date: Thu, 21 Sep 2023 14:25:03 +0300 Subject: [PATCH] query with sql --- base/commands/migration/common.go | 32 +-- base/commands/migration/const.go | 12 +- base/commands/migration/migration_stages.go | 184 ++++++++++++++++++ base/commands/migration/migration_start.go | 19 +- base/commands/migration/start_stages.go | 155 ++------------- .../migration/start_stages_it_test.go | 139 +++++-------- base/commands/migration/status_stages.go | 84 ++++---- .../migration/status_stages_it_test.go | 114 ----------- .../start/migration_success_completed.json | 69 +++++++ .../start/migration_success_failure.json | 70 +++++++ .../start/migration_success_initial.json | 70 +++++++ base/commands/migration/utils.go | 4 - 12 files changed, 533 insertions(+), 419 deletions(-) create mode 100644 base/commands/migration/migration_stages.go create mode 100644 base/commands/migration/testdata/start/migration_success_completed.json create mode 100644 base/commands/migration/testdata/start/migration_success_failure.json create mode 100644 base/commands/migration/testdata/start/migration_success_initial.json diff --git a/base/commands/migration/common.go b/base/commands/migration/common.go index a29cec7e..2c564a30 100644 --- a/base/commands/migration/common.go +++ b/base/commands/migration/common.go @@ -12,24 +12,32 @@ import ( "github.com/hazelcast/hazelcast-go-client/serialization" ) -type MigrationStatus struct { - Status Status `json:"status"` - Logs []string `json:"logs"` - Errors []string `json:"errors"` - Report string `json:"report"` - CompletionPercentage float32 `json:"completionPercentage"` +type MigrationStatusTotal struct { + Status Status `json:"status"` + Logs []string `json:"logs"` + Errors []string `json:"errors"` + Report string `json:"report"` + CompletionPercentage float32 `json:"completionPercentage"` + Migrations []MigrationStatusRow `json:"migrations"` } -type UpdateMessage struct { +type DataStructureInfo struct { + Name string + Type string +} + +type MigrationStatusRow struct { + Name string `json:"name"` + Type string `json:"type"` Status Status `json:"status"` - CompletionPercentage float32 `json:"completionPercentage"` - Message string `json:"message"` + CompletionPercentage float32 `json:"completion_percentage"` + Error string `json:"error"` } var ErrInvalidStatus = errors.New("invalid status value") -func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map) (*MigrationStatus, error) { - v, err := statusMap.Get(ctx, StatusMapEntryName) +func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map, migrationID string) (*MigrationStatusTotal, error) { + v, err := statusMap.Get(ctx, migrationID) //TODO: read only status with sql if err != nil { return nil, fmt.Errorf("getting status: %w", err) } @@ -44,7 +52,7 @@ func readMigrationStatus(ctx context.Context, statusMap *hazelcast.Map) (*Migrat } else { return nil, ErrInvalidStatus } - var ms MigrationStatus + var ms MigrationStatusTotal if err := json.Unmarshal(b, &ms); err != nil { return nil, fmt.Errorf("parsing migration status: %w", err) } diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index cf07e60a..5d9f684d 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -5,21 +5,21 @@ package migration const ( StartQueueName = "__datamigration_start_queue" StatusMapEntryName = "status" - StatusMapPrefix = "__datamigration_" + StatusMapName = "__datamigration_migrations" UpdateTopicPrefix = "__datamigration_updates_" DebugLogsListPrefix = "__datamigration_debug_logs_" MigrationsInProgressList = "__datamigrations_in_progress" - startQueueName = "__datamigration_start_queue" - statusMapEntryName = "status" - argDMTConfig = "dmtConfig" - argTitleDMTConfig = "DMT configuration" + startQueueName = "__datamigration_start_queue" + statusMapEntryName = "status" + argDMTConfig = "dmtConfig" + argTitleDMTConfig = "DMT configuration" ) type Status string const ( StatusStarted Status = "STARTED" - Canceling Status = "CANCELING" + StatusCanceling Status = "CANCELING" StatusComplete Status = "COMPLETED" StatusCanceled Status = "CANCELED" StatusFailed Status = "FAILED" diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go new file mode 100644 index 00000000..ee7633bf --- /dev/null +++ b/base/commands/migration/migration_stages.go @@ -0,0 +1,184 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + errors2 "github.com/hazelcast/hazelcast-commandline-client/errors" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" + "golang.org/x/exp/slices" +) + +func migrationStages(ctx context.Context, ec plug.ExecContext, migrationID, reportOutputDir string, statusMap *hazelcast.Map) ([]stage.Stage[any], error) { + ci, err := ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + if err = waitForMigrationToBeCreated(ctx, ci, migrationID); err != nil { + return nil, err + } + var stages []stage.Stage[any] + dss, err := dataStructuresToBeMigrated(ctx, ec, migrationID) + if err != nil { + return nil, err + } + for i, d := range dss { + i := i + stages = append(stages, stage.Stage[any]{ + ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name), + SuccessMsg: fmt.Sprintf("Migrated %s: %s ...", d.Type, d.Name), + FailureMsg: fmt.Sprintf("Failed migrating %s: %s ...", d.Type, d.Name), + Func: func(ct context.Context, status stage.Statuser[any]) (any, error) { + for { + if ctx.Err() != nil { + return nil, err + } + generalStatus, err := readMigrationStatus(ctx, statusMap, migrationID) + if err != nil { + return nil, err + } + if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, generalStatus.Status) { + err = saveMemberLogs(ctx, ec, ci, migrationID) + if err != nil { + return nil, err + } + var name string + if reportOutputDir == "" { + name = fmt.Sprintf("migration_report_%s.txt", migrationID) + } + err = saveReportToFile(name, generalStatus.Report) + if err != nil { + return nil, err + } + } + switch generalStatus.Status { + case StatusComplete: + return nil, nil + case StatusFailed: + return nil, errors.New(generalStatus.Errors[0]) //TODO + case StatusCanceled, StatusCanceling: + return nil, errors2.ErrUserCancelled + } + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + iter, err := res.Iterator() + if err != nil { + return nil, err + } + if iter.HasNext() { + row, err := iter.Next() + if err != nil { + return nil, err + } + rowStr, err := row.Get(0) + if err != nil { + return nil, err + } + var m MigrationStatusRow + if err = json.Unmarshal(rowStr.(serialization.JSON), &m); err != nil { + return nil, err + } + status.SetProgress(m.CompletionPercentage) + switch m.Status { + case StatusComplete: + return nil, nil + case StatusFailed: + return nil, stage.IgnoreError(errors.New(m.Error)) + case StatusCanceled: + return nil, errors2.ErrUserCancelled + } + } + } + }, + }) + } + return stages, nil +} + +func dataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, migrationID string) ([]DataStructureInfo, error) { + var dss []DataStructureInfo + ci, err := ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + it, err := res.Iterator() + if err != nil { + return nil, err + } + if it.HasNext() { + row, err := it.Next() + if err != nil { + return nil, err + } + r, err := row.Get(0) + var status MigrationStatusTotal + if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil { + return nil, err + } + for _, m := range status.Migrations { + dss = append(dss, DataStructureInfo{ + Name: m.Name, + Type: m.Type, + }) + } + } + return dss, nil +} + +func saveMemberLogs(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) error { + for _, m := range ci.OrderedMembers() { + l, err := ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String()) + if err != nil { + return err + } + logs, err := l.GetAll(ctx) + if err != nil { + return err + } + for _, line := range logs { + ec.Logger().Debugf(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string))) + } + } + return nil +} + +func saveReportToFile(fileName, report string) error { + f, err := os.Create(fmt.Sprintf(fileName)) + if err != nil { + return err + } + defer f.Close() + return os.WriteFile(fileName, []byte(report), 0600) +} + +func waitForMigrationToBeCreated(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { + for { + statusMap, err := ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return err + } + ok, err := statusMap.ContainsKey(ctx, migrationID) + if err != nil { + return err + } + if ok { + return nil + } + } +} diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 367d4632..b0e22f25 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -11,7 +11,6 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/internal/check" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-commandline-client/internal/prompt" - "github.com/hazelcast/hazelcast-go-client" ) type StartCmd struct{} @@ -47,17 +46,21 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - var updateTopic *hazelcast.Topic - sts := NewStartStages(ec.Logger(), updateTopic, MakeMigrationID(), ec.GetStringArg(argDMTConfig), ec.Props().GetString(flagOutputDir)) - if !sts.topicListenerID.Default() && sts.updateTopic != nil { - if err := sts.updateTopic.RemoveListener(ctx, sts.topicListenerID); err != nil { - return err - } - } + mID := MakeMigrationID() + sts := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) if _, err := stage.Execute(ctx, ec, any(nil), sp); err != nil { return err } + mStages, err := migrationStages(ctx, ec, mID, ec.Props().GetString(flagOutputDir), sts.statusMap) + if err != nil { + return err + } + mp := stage.NewFixedProvider(mStages...) + if _, err := stage.Execute(ctx, ec, any(nil), mp); err != nil { + return err + } + ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary("OK Migration completed successfully.") return nil diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index ac9ee82c..13133edd 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -5,50 +5,36 @@ package migration import ( "context" "encoding/json" - "errors" "fmt" - "os" - "time" - clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/log" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/cluster" "github.com/hazelcast/hazelcast-go-client/serialization" - "github.com/hazelcast/hazelcast-go-client/types" - "golang.org/x/exp/slices" - - "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" - "github.com/hazelcast/hazelcast-commandline-client/internal/plug" ) type StartStages struct { - migrationID string - configDir string - ci *hazelcast.ClientInternal - startQueue *hazelcast.Queue - statusMap *hazelcast.Map - updateTopic *hazelcast.Topic - topicListenerID types.UUID - updateMsgChan chan UpdateMessage - reportOutputDir string - logger log.Logger + migrationID string + configDir string + ci *hazelcast.ClientInternal + startQueue *hazelcast.Queue + statusMap *hazelcast.Map + logger log.Logger } var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout while reading status: "+ "please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w", context.DeadlineExceeded) -func NewStartStages(logger log.Logger, updateTopic *hazelcast.Topic, migrationID, configDir, reportOutputDir string) *StartStages { +func NewStartStages(logger log.Logger, migrationID, configDir string) *StartStages { if migrationID == "" { panic("migrationID is required") } return &StartStages{ - updateTopic: updateTopic, - migrationID: migrationID, - configDir: configDir, - reportOutputDir: reportOutputDir, - logger: logger, + migrationID: migrationID, + configDir: configDir, + logger: logger, } } @@ -64,13 +50,7 @@ func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.S ProgressMsg: "Starting the migration", SuccessMsg: "Started the migration", FailureMsg: "Could not start the migration", - Func: st.startStage(ec), - }, - { - ProgressMsg: "Migrating the cluster", - SuccessMsg: "Migrated the cluster", - FailureMsg: "Could not migrate the cluster", - Func: st.migrateStage(ec), + Func: st.startStage(), }, } } @@ -86,34 +66,15 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s if err != nil { return nil, fmt.Errorf("retrieving the start Queue: %w", err) } - st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID)) + st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) if err != nil { return nil, fmt.Errorf("retrieving the status Map: %w", err) } - st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID)) - if err != nil { - return nil, fmt.Errorf("retrieving the update Topic: %w", err) - } - st.updateMsgChan = make(chan UpdateMessage) - st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener) - if err != nil { - return nil, fmt.Errorf("adding message listener to update Topic: %w", err) - - } return nil, nil } } -func (st *StartStages) topicListener(event *hazelcast.MessagePublished) { - var u UpdateMessage - err := json.Unmarshal(event.Value.(serialization.JSON), &u) - if err != nil { - st.logger.Warn(fmt.Sprintf("receiving update from migration cluster: %s", err.Error())) - } - st.updateMsgChan <- u -} - -func (st *StartStages) startStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { +func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { cb, err := makeConfigBundle(st.configDir, st.migrationID) if err != nil { @@ -122,11 +83,6 @@ func (st *StartStages) startStage(ec plug.ExecContext) func(context.Context, sta if err = st.startQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating start Queue: %w", err) } - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if isTerminal, err := st.handleUpdateMessage(ctx, ec, <-st.updateMsgChan, status); isTerminal { - return nil, err - } return nil, nil } } @@ -143,84 +99,3 @@ func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) } return b, nil } - -func (st *StartStages) migrateStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { - return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - for { - select { - case msg := <-st.updateMsgChan: - if isTerminal, err := st.handleUpdateMessage(ctx, ec, msg, status); isTerminal { - return nil, err - } - case <-ctx.Done(): - if err := ctx.Err(); err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return nil, timeoutErr - } - return nil, fmt.Errorf("migration failed: %w", err) - } - } - } - } -} - -func (st *StartStages) handleUpdateMessage(ctx context.Context, ec plug.ExecContext, msg UpdateMessage, status stage.Statuser[any]) (bool, error) { - status.SetProgress(msg.CompletionPercentage) - ec.PrintlnUnnecessary(msg.Message) - if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) { - ms, err := readMigrationStatus(ctx, st.statusMap) - if err != nil { - return true, fmt.Errorf("reading status: %w", err) - } - ec.PrintlnUnnecessary(ms.Report) - var name string - if st.reportOutputDir == "" { - name = fmt.Sprintf("migration_report_%s.txt", st.migrationID) - } - if err = saveReportToFile(name, ms.Report); err != nil { - return true, fmt.Errorf("writing report to file: %w", err) - } - if err = st.saveDebugLogs(ctx, ec, st.migrationID, st.ci.OrderedMembers()); err != nil { - return true, fmt.Errorf("writing debug logs to file: %w", err) - } - ec.PrintlnUnnecessary(fmt.Sprintf("migration report saved to file: %s", name)) - for _, l := range ms.Logs { - ec.Logger().Info(l) - } - switch ms.Status { - case StatusComplete: - return true, nil - case StatusCanceled: - return true, clcerrors.ErrUserCancelled - case StatusFailed: - return true, fmt.Errorf("migration failed") - } - } - return false, nil -} - -func saveReportToFile(fileName, report string) error { - f, err := os.Create(fmt.Sprintf(fileName)) - if err != nil { - return err - } - defer f.Close() - return os.WriteFile(fileName, []byte(report), 0600) -} - -func (st *StartStages) saveDebugLogs(ctx context.Context, ec plug.ExecContext, migrationID string, members []cluster.MemberInfo) error { - for _, m := range members { - l, err := st.ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String()) - if err != nil { - return err - } - logs, err := l.GetAll(ctx) - if err != nil { - return err - } - for _, line := range logs { - ec.Logger().Debugf(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string))) - } - } - return nil -} diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index b4c1938a..4514dddf 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestMigration(t *testing.T) { +func TestMigrationStages(t *testing.T) { testCases := []struct { name string f func(t *testing.T) @@ -34,6 +34,14 @@ func TestMigration(t *testing.T) { } func startTest_Successful(t *testing.T) { + startTest(t, successfulRunner, "OK Migration completed successfully.") +} + +func startTest_Failure(t *testing.T) { + startTest(t, failureRunner, "ERROR Failed migrating IMAP: imap5 ...: some error") +} + +func startTest(t *testing.T, runnerFunc func(context.Context, it.TestContext, string, *sync.WaitGroup), expectedOutput string) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { @@ -41,109 +49,45 @@ func startTest_Successful(t *testing.T) { wg.Add(1) go tcx.WithReset(func() { defer wg.Done() - Must(tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes")) + tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes") }) - c := make(chan string, 1) + wg.Add(1) go findMigrationID(ctx, tcx, c) - migrationID := <-c - migrationReport := successfulRunner(migrationID, tcx, ctx) + mID := <-c + wg.Done() + wg.Add(1) + go runnerFunc(ctx, tcx, mID, &wg) wg.Wait() - tcx.AssertStdoutContains(fmt.Sprintf(` -Hazelcast Data Migration Tool v5.3.0 -(c) 2023 Hazelcast, Inc. - -Selected data structures in the source cluster will be migrated to the target cluster. - - - OK [1/3] Connected to the migration cluster. -first message - OK [2/3] Started the migration. -second message -last message -status report -migration report saved to file: migration_report_%s.txt - OK [3/3] Migrated the cluster. - - OK Migration completed successfully.`, migrationID)) - tcx.WithReset(func() { - require.Equal(t, true, fileExists(migrationReport)) - }) - }) -} - -func startTest_Failure(t *testing.T) { - tcx := it.TestContext{T: t} - ctx := context.Background() - tcx.Tester(func(tcx it.TestContext) { - go tcx.WithReset(func() { - tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes") - }) - migrationReport := failureRunner(tcx, ctx) - tcx.AssertStdoutContains(` -Hazelcast Data Migration Tool v5.3.0 -(c) 2023 Hazelcast, Inc. - -Selected data structures in the source cluster will be migrated to the target cluster. - - - OK [1/3] Connected to the migration cluster. -first message - OK [2/3] Started the migration. -second message -fail status report`) + tcx.AssertStdoutContains(expectedOutput) tcx.WithReset(func() { - require.Equal(t, true, fileExists(migrationReport)) + f := fmt.Sprintf("migration_report_%s.txt", mID) + require.Equal(t, true, fileExists(f)) + Must(os.Remove(f)) }) }) } -func fileExists(filename string) bool { - a := MustValue(os.Getwd()) - fmt.Println(a) - _, err := os.Stat(filename) - if os.IsNotExist(err) { - return false - } - Must(os.Remove(filename)) - return true +func successfulRunner(ctx context.Context, tcx it.TestContext, migrationID string, wg *sync.WaitGroup) { + mSQL := fmt.Sprintf(`CREATE MAPPING IF NOT EXISTS %s TYPE IMap OPTIONS('keyFormat'='varchar', 'valueFormat'='json')`, migration.StatusMapName) + MustValue(tcx.Client.SQL().Execute(ctx, mSQL)) + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) + MustValue(statusMap.Put(ctx, migrationID, serialization.JSON(b))) + b = MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) + MustValue(statusMap.Put(ctx, migrationID, serialization.JSON(b))) + wg.Done() } -func successfulRunner(migrationID string, tcx it.TestContext, ctx context.Context) string { - topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) - msg := MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message", CompletionPercentage: 10})) - Must(topic.Publish(ctx, serialization.JSON(msg))) - msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "second message", CompletionPercentage: 20})) - Must(topic.Publish(ctx, serialization.JSON(msg))) - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID))) - b := MustValue(json.Marshal(migration.MigrationStatus{ - Status: migration.StatusComplete, - Report: "status report", - Logs: []string{"log1", "log2"}, - })) - Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b))) - msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusComplete, Message: "last message", CompletionPercentage: 100})) - Must(topic.Publish(ctx, serialization.JSON(msg))) - return fmt.Sprintf("migration_report_%s.txt", migrationID) -} - -func failureRunner(tcx it.TestContext, ctx context.Context) string { - c := make(chan string, 1) - go findMigrationID(ctx, tcx, c) - migrationID := <-c - topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) - msg := MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message", CompletionPercentage: 20})) - Must(topic.Publish(ctx, serialization.JSON(msg))) - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID))) - b := MustValue(json.Marshal(migration.MigrationStatus{ - Status: migration.StatusFailed, - Report: "fail status report", - Errors: []string{"error1", "error2"}, - })) - Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(b))) - msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusFailed, Message: "second message", CompletionPercentage: 60})) - Must(topic.Publish(ctx, serialization.JSON(msg))) - return fmt.Sprintf("migration_report_%s.txt", migrationID) +func failureRunner(ctx context.Context, tcx it.TestContext, migrationID string, wg *sync.WaitGroup) { + mSQL := fmt.Sprintf(`CREATE MAPPING IF NOT EXISTS %s TYPE IMap OPTIONS('keyFormat'='varchar', 'valueFormat'='json')`, migration.StatusMapName) + MustValue(tcx.Client.SQL().Execute(ctx, mSQL)) + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) + MustValue(statusMap.Put(ctx, migrationID, serialization.JSON(b))) + b = MustValue(os.ReadFile("testdata/start/migration_success_failure.json")) + MustValue(statusMap.Put(ctx, migrationID, serialization.JSON(b))) + wg.Done() } func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) { @@ -158,3 +102,12 @@ func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) { } } } + +func fileExists(filename string) bool { + MustValue(os.Getwd()) + _, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return true +} diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index b6d5b1f7..a4f1a41f 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -7,10 +7,8 @@ import ( "encoding/json" "fmt" - "github.com/hazelcast/hazelcast-commandline-client/internal/log" - "golang.org/x/exp/slices" - "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/log" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" @@ -22,8 +20,8 @@ type StatusStages struct { migrationsInProgressList *hazelcast.List statusMap *hazelcast.Map updateTopic *hazelcast.Topic - updateMsgChan chan UpdateMessage - logger log.Logger + // updateMsgChan chan UpdateMessage + logger log.Logger } func NewStatusStages(logger log.Logger) *StatusStages { @@ -76,7 +74,7 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, return nil, fmt.Errorf("parsing migration in progress: %w", err) } st.migrationID = mip.MigrationID - st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID)) + // st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID)) if err != nil { return nil, err } @@ -87,45 +85,47 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, func (st *StatusStages) fetchStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - ms, err := readMigrationStatus(ctx, st.statusMap) - if err != nil { - return nil, fmt.Errorf("reading status: %w", err) - } - if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, ms.Status) { - ec.PrintlnUnnecessary(ms.Report) - return nil, nil - } - st.updateMsgChan = make(chan UpdateMessage) - id, err := st.updateTopic.AddMessageListener(ctx, st.topicListener) - defer st.updateTopic.RemoveListener(ctx, id) - for { - select { - case msg := <-st.updateMsgChan: - ec.PrintlnUnnecessary(msg.Message) - status.SetProgress(msg.CompletionPercentage) - if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) { - ms, err := readMigrationStatus(ctx, st.statusMap) - if err != nil { - return nil, fmt.Errorf("reading status: %w", err) - } - ec.PrintlnUnnecessary(ms.Report) - return nil, nil - } + return nil, nil + /* + ms, err := readMigrationStatus(ctx, st.statusMap) + if err != nil { + return nil, fmt.Errorf("reading status: %w", err) } - } + if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, ms.Status) { + ec.PrintlnUnnecessary(ms.Report) + return nil, nil + } + // st.updateMsgChan = make(chan UpdateMessage) + id, err := st.updateTopic.AddMessageListener(ctx, st.topicListener) + defer st.updateTopic.RemoveListener(ctx, id) + for { + select { + //case msg := <-st.updateMsgChan: + // ec.PrintlnUnnecessary(msg.Message) + // status.SetProgress(msg.CompletionPercentage) + // if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, msg.Status) { + // ms, err := readMigrationStatus(ctx, st.statusMap) + // if err != nil { + // return nil, fmt.Errorf("reading status: %w", err) + } + // ec.PrintlnUnnecessary(ms.Report) + // return nil, nil + // } + // } + }*/ } } func (st *StatusStages) topicListener(event *hazelcast.MessagePublished) { - var u UpdateMessage - v, ok := event.Value.(serialization.JSON) - if !ok { - st.logger.Warn(fmt.Sprintf("update message type is unexpected")) - return - } - err := json.Unmarshal(v, &u) - if err != nil { - st.logger.Warn(fmt.Sprintf("receiving update from migration cluster: %s", err.Error())) - } - st.updateMsgChan <- u + // var u UpdateMessage + // v, ok := event.Value.(serialization.JSON) + // if !ok { + // st.logger.Warn(fmt.Sprintf("update message type is unexpected")) + // return + // } + // err := json.Unmarshal(v, &u) + // if err != nil { + // st.logger.Warn(fmt.Sprintf("receiving update from migration cluster: %s", err.Error())) + // } + // st.updateMsgChan <- u } diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 0e931701..68c462bd 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -3,120 +3,6 @@ package migration_test import ( - "context" - "encoding/json" - "sync" - "testing" - "time" - _ "github.com/hazelcast/hazelcast-commandline-client/base" _ "github.com/hazelcast/hazelcast-commandline-client/base/commands" - "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" - . "github.com/hazelcast/hazelcast-commandline-client/internal/check" - "github.com/hazelcast/hazelcast-commandline-client/internal/it" - "github.com/hazelcast/hazelcast-go-client/serialization" - "github.com/stretchr/testify/require" ) - -func TestStatus(t *testing.T) { - testCases := []struct { - name string - f func(t *testing.T) - }{ - {name: "status", f: statusTest}, - {name: "noMigrationsStatus", f: noMigrationsStatusTest}, - } - for _, tc := range testCases { - t.Run(tc.name, tc.f) - } -} - -func noMigrationsStatusTest(t *testing.T) { - tcx := it.TestContext{T: t} - ctx := context.Background() - tcx.Tester(func(tcx it.TestContext) { - var wg sync.WaitGroup - wg.Add(1) - go tcx.WithReset(func() { - defer wg.Done() - tcx.CLC().Execute(ctx, "status") - }) - wg.Wait() - tcx.AssertStdoutContains("there are no migrations are in progress on migration cluster") - }) -} - -func statusTest(t *testing.T) { - tcx := it.TestContext{T: t} - ctx := context.Background() - tcx.Tester(func(tcx it.TestContext) { - mID := preStatusRunner(t, tcx, ctx) - var wg sync.WaitGroup - wg.Add(1) - go tcx.WithReset(func() { - defer wg.Done() - Must(tcx.CLC().Execute(ctx, "status")) - }) - time.Sleep(1 * time.Second) // give time to status command to register its topic listener - statusRunner(t, mID, tcx, ctx) - wg.Wait() - tcx.AssertStdoutContains(` -Hazelcast Data Migration Tool v5.3.0 -(c) 2023 Hazelcast, Inc. - - OK [1/2] Connected to the migration cluster. -first message -Completion Percentage: 60.000000 -last message -Completion Percentage: 100.000000 -status report - OK [2/2] Fetched migration status. - -OK`) - }) -} - -func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) string { - // create a migration in the __datamigrations_in_progress list - mID := migration.MakeMigrationID() - l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - m := MustValue(json.Marshal(migration.MigrationInProgress{ - MigrationID: mID, - })) - ok := MustValue(l.Add(ctx, serialization.JSON(m))) - require.Equal(t, true, ok) - // create a record in the status map - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(mID))) - st := MustValue(json.Marshal(migration.MigrationStatus{ - Status: migration.StatusInProgress, - Report: "status report", - CompletionPercentage: 60, - })) - Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st))) - return mID -} - -func statusRunner(t *testing.T, migrationID string, tcx it.TestContext, ctx context.Context) { - // publish the first message in the update topic - updateTopic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) - msg := MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message", CompletionPercentage: 60})) - Must(updateTopic.Publish(ctx, serialization.JSON(msg))) - // create a terminal record in status map - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID))) - st := MustValue(json.Marshal(migration.MigrationStatus{ - Status: migration.StatusComplete, - Report: "status report", - CompletionPercentage: 100, - })) - Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st))) - // publish the second message in the update topic - msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusComplete, Message: "last message", CompletionPercentage: 100})) - Must(updateTopic.Publish(ctx, serialization.JSON(msg))) - // remove the migration from the __datamigrations_in_progress list - l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - m := MustValue(json.Marshal(migration.MigrationInProgress{ - MigrationID: migrationID, - })) - ok := MustValue(l.Remove(ctx, serialization.JSON(m))) - require.Equal(t, true, ok) -} diff --git a/base/commands/migration/testdata/start/migration_success_completed.json b/base/commands/migration/testdata/start/migration_success_completed.json new file mode 100644 index 00000000..41d0d8ff --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_completed.json @@ -0,0 +1,69 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 1000, + "totalEntries": 1000, + "completionPercentage": 100 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap12", + "type": "IMAP", + "status": "COMPLETED", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": [] +} \ No newline at end of file diff --git a/base/commands/migration/testdata/start/migration_success_failure.json b/base/commands/migration/testdata/start/migration_success_failure.json new file mode 100644 index 00000000..30b14cbf --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_failure.json @@ -0,0 +1,70 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "FAILED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 1000, + "totalEntries": 1000, + "completionPercentage": 100 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap12", + "type": "IMAP", + "status": "FAILED", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100, + "error": "some error" + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": ["some error"] +} \ No newline at end of file diff --git a/base/commands/migration/testdata/start/migration_success_initial.json b/base/commands/migration/testdata/start/migration_success_initial.json new file mode 100644 index 00000000..5f90ba74 --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_initial.json @@ -0,0 +1,70 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 121, + "totalEntries": 1000, + "completionPercentage": 12.1 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12, + "error": "some error" + }, + { + "name": "imap12", + "type": "IMAP", + "status": "NOT_STARTED", + "entriesMigrated": 0, + "totalEntries": 10000, + "completionPercentage": 0 + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": [] +} \ No newline at end of file diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index e3a899d2..5af0aa0e 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -125,10 +125,6 @@ func MakeMigrationID() string { return types.NewUUID().String() } -func MakeStatusMapName(migrationID string) string { - return StatusMapPrefix + migrationID -} - func MakeUpdateTopicName(migrationID string) string { return UpdateTopicPrefix + migrationID }