Skip to content

Commit

Permalink
fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Aug 29, 2023
1 parent 3c215f8 commit 06abe13
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions base/commands/migration/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (
)

var statusErrMapping = map[status]error{
statusInProgress: nil,
statusComplete: nil,
statusCanceled: clcerrors.ErrUserCancelled,
statusFailed: errors.New("migration failed"),
statusCanceled: clcerrors.ErrUserCancelled,
statusFailed: errors.New("migration failed"),
}

type Stages struct {
Expand Down Expand Up @@ -99,7 +97,9 @@ func (st *Stages) startStage(ctx context.Context) func(stage.Statuser) error {
if err = st.startQueue.Put(ctx, serialization.JSON(b)); err != nil {
return err
}
if err = st.waitForStatus(ctx, 30*time.Second, []status{statusInProgress, statusComplete}); err != nil {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err = st.waitForStatus(ctx, []status{statusInProgress, statusComplete}, time.Second); err != nil {
return err
}
return nil
Expand All @@ -108,36 +108,35 @@ func (st *Stages) startStage(ctx context.Context) func(stage.Statuser) error {

func (st *Stages) migrateStage(ctx context.Context) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
return st.waitForStatus(ctx, 0, []status{statusComplete})
return st.waitForStatus(ctx, []status{statusComplete}, 5*time.Second)
}
}

func (st *Stages) waitForStatus(ctx context.Context, timeout time.Duration, expected []status) error {
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
func (st *Stages) waitForStatus(ctx context.Context, expected []status, waitInterval time.Duration) error {
for {
timeoutErr := fmt.Errorf("migration could not be completed: reached timeout while reading status: " +
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster")
if err := ctx.Err(); err != nil && errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("migration could not be completed: reached timeout while reading status: " +
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster")
return timeoutErr
}
s, err := st.readStatus(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return timeoutErr
}
return fmt.Errorf("reading status: %w", err)
}
if statusErrMapping[s] != nil {
return statusErrMapping[s]
}
if expectationMet(s, expected) {
if expectationMet(expected, s) {
return nil
}
time.Sleep(3 * time.Second)
time.Sleep(waitInterval)
}
}

func expectationMet(actual status, expected []status) bool {
func expectationMet(expected []status, actual status) bool {
for _, e := range expected {
if e == actual {
return true
Expand Down

0 comments on commit 06abe13

Please sign in to comment.