diff --git a/base/commands/migration/stages.go b/base/commands/migration/stages.go index 30f5084d..9526b549 100644 --- a/base/commands/migration/stages.go +++ b/base/commands/migration/stages.go @@ -16,10 +16,8 @@ import ( ) var statusErrMapping = map[status]error{ - statusInProgress: nil, - statusComplete: nil, - statusCanceled: clcerrors.ErrUserCancelled, - statusFailed: errors.New("migration failed"), + statusCanceled: clcerrors.ErrUserCancelled, + statusFailed: errors.New("migration failed"), } type Stages struct { @@ -99,7 +97,9 @@ func (st *Stages) startStage(ctx context.Context) func(stage.Statuser) error { if err = st.startQueue.Put(ctx, serialization.JSON(b)); err != nil { return err } - if err = st.waitForStatus(ctx, 30*time.Second, []status{statusInProgress, statusComplete}); err != nil { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err = st.waitForStatus(ctx, []status{statusInProgress, statusComplete}, time.Second); err != nil { return err } return nil @@ -108,36 +108,35 @@ func (st *Stages) startStage(ctx context.Context) func(stage.Statuser) error { func (st *Stages) migrateStage(ctx context.Context) func(statuser stage.Statuser) error { return func(stage.Statuser) error { - return st.waitForStatus(ctx, 0, []status{statusComplete}) + return st.waitForStatus(ctx, []status{statusComplete}, 5*time.Second) } } -func (st *Stages) waitForStatus(ctx context.Context, timeout time.Duration, expected []status) error { - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } +func (st *Stages) waitForStatus(ctx context.Context, expected []status, waitInterval time.Duration) error { for { + timeoutErr := fmt.Errorf("migration could not be completed: reached timeout while reading status: " + + "please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster") if err := ctx.Err(); err != nil && errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("migration could not be completed: reached timeout while reading status: " + - "please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster") + return timeoutErr } s, err := st.readStatus(ctx) if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return timeoutErr + } return fmt.Errorf("reading status: %w", err) } if statusErrMapping[s] != nil { return statusErrMapping[s] } - if expectationMet(s, expected) { + if expectationMet(expected, s) { return nil } - time.Sleep(3 * time.Second) + time.Sleep(waitInterval) } } -func expectationMet(actual status, expected []status) bool { +func expectationMet(expected []status, actual status) bool { for _, e := range expected { if e == actual { return true