Skip to content

Commit

Permalink
fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 19, 2023
1 parent b5554a9 commit e35e7b5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 23 deletions.
4 changes: 4 additions & 0 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ const (
)

const flagOutputDir = "output-dir"

const banner = `Hazelcast Data Migration Tool v5.3.0
(c) 2023 Hazelcast, Inc.
`
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
ec.PrintlnUnnecessary("")
var updateTopic *hazelcast.Topic
sts := NewStartStages(updateTopic, MakeMigrationID(), ec.Args()[0], ec.Props().GetString(flagOutputDir))
sts := NewStartStages(ec.Logger(), updateTopic, MakeMigrationID(), ec.Args()[0], ec.Props().GetString(flagOutputDir))
if !sts.topicListenerID.Default() && sts.updateTopic != nil {
if err := sts.updateTopic.RemoveListener(ctx, sts.topicListenerID); err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions base/commands/migration/migration_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ func (s StatusCmd) Init(cc plug.InitContext) error {

func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) error {
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0
(c) 2023 Hazelcast, Inc.
`)
sts := NewStatusStages()
ec.PrintlnUnnecessary(banner)
sts := NewStatusStages(ec.Logger())
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
Expand Down
34 changes: 20 additions & 14 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors"
"github.com/hazelcast/hazelcast-commandline-client/internal/log"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/cluster"
"github.com/hazelcast/hazelcast-go-client/serialization"
Expand All @@ -31,13 +32,14 @@ type StartStages struct {
topicListenerID types.UUID
updateMsgChan chan UpdateMessage
reportOutputDir string
logger log.Logger
}

var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout while reading status: "+
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w",
context.DeadlineExceeded)

func NewStartStages(updateTopic *hazelcast.Topic, migrationID, configDir, reportOutputDir string) *StartStages {
func NewStartStages(logger log.Logger, updateTopic *hazelcast.Topic, migrationID, configDir, reportOutputDir string) *StartStages {
if migrationID == "" {
panic("migrationID is required")
}
Expand All @@ -46,6 +48,7 @@ func NewStartStages(updateTopic *hazelcast.Topic, migrationID, configDir, report
migrationID: migrationID,
configDir: configDir,
reportOutputDir: reportOutputDir,
logger: logger,
}
}

Expand Down Expand Up @@ -81,27 +84,31 @@ func (st *StartStages) connectStage(ctx context.Context, ec plug.ExecContext) fu
}
st.startQueue, err = st.ci.Client().GetQueue(ctx, StartQueueName)
if err != nil {
return err
return fmt.Errorf("retrieving the start Queue: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, MakeStatusMapName(st.migrationID))
if err != nil {
return err
return fmt.Errorf("retrieving the status Map: %w", err)
}
st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID))
if err != nil {
return err
return fmt.Errorf("retrieving the update Topic: %w", err)
}
st.updateMsgChan = make(chan UpdateMessage)
st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener)
return err
if err != nil {
return fmt.Errorf("adding message listener to update Topic: %w", err)

}
return nil
}
}

func (st *StartStages) topicListener(event *hazelcast.MessagePublished) {
var u UpdateMessage
err := json.Unmarshal(event.Value.(serialization.JSON), &u)
if err != nil {
panic(fmt.Errorf("receiving update from migration cluster: %w", err))
st.logger.Warn(fmt.Sprintf("receiving update from migration cluster: %s", err.Error()))
}
st.updateMsgChan <- u
}
Expand All @@ -110,10 +117,10 @@ func (st *StartStages) startStage(ctx context.Context, ec plug.ExecContext) func
return func(status stage.Statuser) error {
cb, err := makeConfigBundle(st.configDir, st.migrationID)
if err != nil {
return err
return fmt.Errorf("making configuration bundle: %w", err)
}
if err = st.startQueue.Put(ctx, cb); err != nil {
return err
return fmt.Errorf("updating start Queue: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -173,7 +180,7 @@ func (st *StartStages) handleUpdateMessage(ctx context.Context, ec plug.ExecCont
if err = saveReportToFile(name, ms.Report); err != nil {
return true, fmt.Errorf("writing report to file: %w", err)
}
if err = st.saveDebugLogs(ctx, ec, st.ci.OrderedMembers()); err != nil {
if err = st.saveDebugLogs(ctx, ec, st.migrationID, st.ci.OrderedMembers()); err != nil {
return true, fmt.Errorf("writing debug logs to file: %w", err)
}
ec.PrintlnUnnecessary(fmt.Sprintf("migration report saved to file: %s", name))
Expand All @@ -198,11 +205,10 @@ func saveReportToFile(fileName, report string) error {
return err
}
defer f.Close()
_, err = f.WriteString(report)
return err
return os.WriteFile(fileName, []byte(report), 0600)
}

func (st *StartStages) saveDebugLogs(ctx context.Context, ec plug.ExecContext, members []cluster.MemberInfo) error {
func (st *StartStages) saveDebugLogs(ctx context.Context, ec plug.ExecContext, migrationID string, members []cluster.MemberInfo) error {
for _, m := range members {
l, err := st.ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String())
if err != nil {
Expand All @@ -212,8 +218,8 @@ func (st *StartStages) saveDebugLogs(ctx context.Context, ec plug.ExecContext, m
if err != nil {
return err
}
for _, l := range logs {
ec.Logger().Debugf(l.(string))
for _, line := range logs {
ec.Logger().Debugf(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string)))
}
}
return nil
Expand Down
15 changes: 11 additions & 4 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/internal/log"
"golang.org/x/exp/slices"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
Expand All @@ -22,10 +23,11 @@ type StatusStages struct {
statusMap *hazelcast.Map
updateTopic *hazelcast.Topic
updateMsgChan chan UpdateMessage
logger log.Logger
}

func NewStatusStages() *StatusStages {
return &StatusStages{}
func NewStatusStages(logger log.Logger) *StatusStages {
return &StatusStages{logger: logger}
}

func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
Expand Down Expand Up @@ -116,9 +118,14 @@ func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) fun

func (st *StatusStages) topicListener(event *hazelcast.MessagePublished) {
var u UpdateMessage
err := json.Unmarshal(event.Value.(serialization.JSON), &u)
v, ok := event.Value.(serialization.JSON)
if !ok {
st.logger.Warn(fmt.Sprintf("update message type is unexpected"))
return
}
err := json.Unmarshal(v, &u)
if err != nil {
panic(fmt.Errorf("receiving update from migration cluster: %w", err))
st.logger.Warn(fmt.Sprintf("receiving update from migration cluster: %s", err.Error()))
}
st.updateMsgChan <- u
}

0 comments on commit e35e7b5

Please sign in to comment.