Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 19, 2023
1 parent 329cc7f commit b422b9a
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,62 +22,62 @@ func NewCancelStages() *CancelStages {
return &CancelStages{}
}

func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
return []stage.Stage{
func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] {
return []stage.Stage[any]{
{
ProgressMsg: "Connecting to the migration cluster",
SuccessMsg: "Connected to the migration cluster",
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ctx, ec),
Func: st.connectStage(ec),
},
{
ProgressMsg: "Canceling the migration",
SuccessMsg: "Canceled the migration",
FailureMsg: "Could not cancel the migration",
Func: st.cancelStage(ctx),
Func: st.cancelStage(),
},
}
}

func (st *CancelStages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
return func(status stage.Statuser) error {
func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
var err error
st.ci, err = ec.ClientInternal(ctx)
if err != nil {
return err
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return err
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return err
return nil, err
}
if len(all) == 0 {
return fmt.Errorf("there are no migrations are in progress on migration cluster")
return nil, fmt.Errorf("there are no migrations are in progress on migration cluster")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return fmt.Errorf("parsing migration in progress: %w", err)
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return err
return nil, err
}
}

func (st *CancelStages) cancelStage(ctx context.Context) func(stage.Statuser) error {
return func(statuser stage.Statuser) error {
func (st *CancelStages) cancelStage() func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
c := CancelItem{ID: st.migrationID}
b, err := json.Marshal(c)
if err != nil {
return err
return nil, err
}
st.cancelQueue.Put(ctx, serialization.JSON(b))
return err
return nil, err
}
}

Expand Down

0 comments on commit b422b9a

Please sign in to comment.