Skip to content

Commit

Permalink
Merge branch 'dmt' into CLC-429
Browse files Browse the repository at this point in the history
# Conflicts:
#	base/commands/migration/migration_start.go
  • Loading branch information
kutluhanmetin committed Nov 14, 2023
2 parents 6a4d04e + 1af0ff2 commit c03e48a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 26 deletions.
5 changes: 4 additions & 1 deletion base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
status.SetText("Unable to calculate remaining duration and progress")
} else {
status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name))
status.SetProgress(cp)
status.SetProgress(cp / 100.0)
status.SetRemainingDuration(rt)
}
}
Expand Down Expand Up @@ -200,6 +200,9 @@ func saveReportToFile(ctx context.Context, ci *hazelcast.ClientInternal, migrati

func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
status, err := fetchMigrationStatus(ctx, ci, migrationID)
if err != nil {
if errors.Is(err, migrationStatusNotFoundErr) {
Expand Down
18 changes: 7 additions & 11 deletions base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func (StartCmd) Init(cc plug.InitContext) error {

func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) {
cmd.SetCancelMsg(" (Ctrl+C to exit) ")
ci, err := ec.ClientInternal(ctx)
if err != nil {
return err
}
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0
(c) 2023 Hazelcast, Inc.
Expand All @@ -64,22 +60,22 @@ In order to cancel the migration, use the 'cancel' command.
}
ec.PrintlnUnnecessary("")
mID := MigrationIDGeneratorFunc()
sts, err := NewStartStages(ec.Logger(), mID, conf)
if err != nil {
return err
}
defer func() {
maybePrintWarnings(ctx, ec, ci, mID)
finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir))
maybePrintWarnings(ctx, ec, sts.ci, mID)
finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID, ec.Props().GetString(flagOutputDir))
if err == nil {
err = finalizeErr
}
}()
sts, err := NewStartStages(ec.Logger(), mID, conf)
if err != nil {
return err
}
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if _, err = stage.Execute(ctx, ec, any(nil), sp); err != nil {
return err
}
mStages, err := createMigrationStages(ctx, ec, ci, mID)
mStages, err := createMigrationStages(ctx, ec, sts.ci, mID)
if err != nil {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions base/commands/migration/migration_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@ func (s StatusCmd) Unwrappable() {}
func (s StatusCmd) Init(cc plug.InitContext) error {
cc.SetCommandUsage("status")
cc.SetCommandGroup("migration")
help := "Get status of the data migration in progress"
help := "Get status of the data migration/estimation in progress"
cc.AddStringFlag(flagOutputDir, "o", "", false, "output directory for the migration report, if not given current directory is used")
cc.SetCommandHelp(help, help)
return nil
}

func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) {
ci, err := ec.ClientInternal(ctx)
if err != nil {
return err
}
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(banner)
sts := NewStatusStages()
Expand All @@ -37,13 +33,13 @@ func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) {
return err
}
defer func() {
maybePrintWarnings(ctx, ec, ci, mID.(string))
finalizeErr := finalizeMigration(ctx, ec, ci, mID.(string), ec.Props().GetString(flagOutputDir))
maybePrintWarnings(ctx, ec, sts.ci, mID.(string))
finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID.(string), ec.Props().GetString(flagOutputDir))
if err == nil {
err = finalizeErr
}
}()
mStages, err := createMigrationStages(ctx, ec, ci, mID.(string))
mStages, err := createMigrationStages(ctx, ec, sts.ci, mID.(string))
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (
if err = st.startQueue.Put(ctx, cb); err != nil {
return nil, fmt.Errorf("updating start Queue: %w", err)
}
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
childCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil {
return nil, err
Expand All @@ -91,7 +91,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (

func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
childCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil {
return nil, fmt.Errorf("waiting for prechecks to complete: %w", err)
Expand All @@ -102,6 +102,9 @@ func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]

func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
status, err := fetchMigrationStatus(ctx, ci, migrationID)
if err != nil {
if errors.Is(err, migrationStatusNotFoundErr) {
Expand Down
6 changes: 3 additions & 3 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
Func: st.connectStage(ec),
},
{
ProgressMsg: "Finding migration in progress",
SuccessMsg: "Found migration in progress",
FailureMsg: "Could not find a migration in progress",
ProgressMsg: "Finding migration/estimation in progress",
SuccessMsg: "Found migration/estimation in progress",
FailureMsg: "Could not find a migration/estimation in progress",
Func: st.findMigrationInProgress(ec),
},
}
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/status_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func statusTest(t *testing.T) {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "status", "-o", outDir))
}()
tcx.AssertStdoutContains("Found migration in progress")
tcx.AssertStdoutContains("Found migration/estimation in progress")
setStatusCompleted(mID, tcx, ctx)
wg.Wait()
tcx.WithReset(func() {
Expand Down

0 comments on commit c03e48a

Please sign in to comment.