Skip to content

Commit

Permalink
handle case if coordinator cache is invalid
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Aug 25, 2023
1 parent 3f4fcea commit ca0a881
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
16 changes: 16 additions & 0 deletions clc/cmd/clc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/hazelcast/hazelcast-go-client"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -376,7 +377,22 @@ func (m *Main) createCommands() error {
return nil
}

func (m *Main) ensureClientWithTimeout(ctx context.Context, cfg hazelcast.Config, t time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, t)
defer cancel()
err := m.ensureClient(ctx, cfg)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
return err
}
}
}

func (m *Main) ensureClient(ctx context.Context, cfg hazelcast.Config) error {
time.Sleep(6 * time.Second)
if m.ci.Load() != nil {
return nil
}
Expand Down
27 changes: 21 additions & 6 deletions clc/cmd/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/fatih/color"
"github.com/hazelcast/hazelcast-commandline-client/internal/viridian"
"github.com/hazelcast/hazelcast-go-client"
"github.com/spf13/cobra"
"github.com/theckman/yacspin"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/output"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-commandline-client/internal/terminal"
"github.com/hazelcast/hazelcast-commandline-client/internal/viridian"
)

const (
Expand Down Expand Up @@ -116,13 +116,28 @@ func (ec *ExecContext) ClientInternal(ctx context.Context) (*hazelcast.ClientInt
if err != nil {
return nil, err
}
if err = viridian.MaybeOptimizeDiscovery(&cfg, ec.Logger()); err != nil {
return nil, err
}
civ, stop, err := ec.ExecuteBlocking(ctx, func(ctx context.Context, sp clc.Spinner) (any, error) {
sp.SetText("Connecting to the cluster")
if err := ec.main.ensureClient(ctx, cfg); err != nil {
return nil, err
var ensured bool
if cfg.Cluster.Cloud.Token != "" { // if it is Viridian Cluster
appliedFromCache := viridian.ApplyViridianDiscoveryConfig(&cfg, ec.Logger())
if err = ec.main.ensureClientWithTimeout(ctx, cfg, 5*time.Second); err != nil {
if appliedFromCache && errors.Is(err, context.DeadlineExceeded) {
if err = viridian.DeleteCache(ec.Logger(), cfg); err != nil {
return nil, err
}
viridian.ApplyViridianDiscoveryConfig(&cfg, ec.Logger())
} else {
return nil, err
}
} else {
ensured = true
}
}
if !ensured {
if err = ec.main.ensureClient(ctx, cfg); err != nil {
return nil, err
}
}
return ec.main.clientInternal(), nil
})
Expand Down
31 changes: 21 additions & 10 deletions internal/viridian/discovery_optimization.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const (
storePath = "store"
storePath = "caches/coordinator"
addressesKeyFormat = "viridian.addresses.%s"
invalidKeyFormat = "viridian.invalid.%s"
discoveryEndpoint = "%s/cluster/discovery?token=%s"
Expand All @@ -32,27 +32,30 @@ type address struct {
PublicAddress string `json:"public-address"`
}

func MaybeOptimizeDiscovery(cfg *hazelcast.Config, log log.Logger) error {
if cfg.Cluster.Cloud.Token == "" { // not a viridian cluster
return nil
}
func ApplyViridianDiscoveryConfig(cfg *hazelcast.Config, log log.Logger) bool {
var appliedFromCache bool
var addresses []address
sa := store.NewStoreAccessor(filepath.Join(paths.Home(), storePath), log)
addresses, err := getFromCache(sa, cfg)
if err != nil {
return err
log.Debugf("Error: reading addresses from cache: %w")
return appliedFromCache
}
if len(addresses) == 0 {
if len(addresses) > 0 {
appliedFromCache = true
} else {
addresses, err = getFromAPI(cfg.Cluster.Cloud.Token)
if err != nil {
return err
log.Debugf("Error: fetching addresses from API: %w")
return appliedFromCache
}
if err = updateCache(sa, addresses, cfg); err != nil {
return err
log.Debugf("Error: saving addresses to cache: %w")
return appliedFromCache
}
}
modifyStrategy(cfg, addresses)
return nil
return appliedFromCache
}

func getFromCache(sa *store.StoreAccessor, cfg *hazelcast.Config) ([]address, error) {
Expand Down Expand Up @@ -120,6 +123,14 @@ func updateCache(sa *store.StoreAccessor, addresses []address, cfg *hazelcast.Co
return err
}

func DeleteCache(l log.Logger, cfg hazelcast.Config) error {
sa := store.NewStoreAccessor(filepath.Join(paths.Home(), storePath), l)
_, err := sa.WithLock(func(s *store.Store) (any, error) {
return nil, s.DeleteEntriesWithPrefix(fmt.Sprintf(addressesKeyFormat, cfg.Cluster.Name))
})
return err
}

func getFromAPI(token string) ([]address, error) {
var addresses []address
r, err := http.DefaultClient.Get(fmt.Sprintf(discoveryEndpoint, APIBaseURL(), token))
Expand Down

0 comments on commit ca0a881

Please sign in to comment.