From 26de392d4de8ebfaba94c53085b0f29c49a1a2f2 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 25 Sep 2024 10:28:16 -0500 Subject: [PATCH] feat(tsdb): Adds shard opening progress checks to startup This PR adds a check to see how many shards are remaining vs how many shards are opened as well as the percentage. closes influxdata/feature-requests#476 --- cmd/influxd/run/command.go | 4 + cmd/influxd/run/server.go | 15 ++ cmd/influxd/run/startup_logger.go | 39 ++++ tsdb/store.go | 349 ++++++++++++++++++++---------- tsdb/store_test.go | 112 ++++++++++ 5 files changed, 401 insertions(+), 118 deletions(-) create mode 100644 cmd/influxd/run/startup_logger.go diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index cfb7e67b153..6f30536bf5a 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error { s.Logger = cmd.Logger s.CPUProfile = options.CPUProfile s.MemProfile = options.MemProfile + + sl := NewStartupProgressLogger(s.Logger) + s.SetStartupMetrics(sl) + if err := s.Open(); err != nil { return fmt.Errorf("open server: %s", err) } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index e8b747b4659..7f0a178e356 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -65,6 +65,12 @@ type BuildInfo struct { Time string } +type StartupProgress interface { + AddShard() + RemoveShardFromCount() + CompletedShard() +} + // Server represents a container for the metadata and storage data and services. // It is built using a Config and it manages the startup and shutdown of all // services in the proper order. @@ -96,6 +102,8 @@ type Server struct { Monitor *monitor.Monitor + StartupProgressMetrics StartupProgress + // Server reporting and registration reportingDisabled bool @@ -279,6 +287,10 @@ func (s *Server) SetLogOutput(w io.Writer) { s.MuxLogger = tcp.MuxLogger(w) } +func (s *Server) SetStartupMetrics(sp StartupProgress) { + s.StartupProgressMetrics = sp +} + func (s *Server) appendMonitorService() { s.Services = append(s.Services, s.Monitor) } @@ -465,6 +477,9 @@ func (s *Server) Open() error { s.MetaClient.WithLogger(s.Logger) } s.TSDBStore.WithLogger(s.Logger) + + s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics) + if s.config.Data.QueryLogEnabled { s.QueryExecutor.WithLogger(s.Logger) } else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries { diff --git a/cmd/influxd/run/startup_logger.go b/cmd/influxd/run/startup_logger.go new file mode 100644 index 00000000000..7a8df56717f --- /dev/null +++ b/cmd/influxd/run/startup_logger.go @@ -0,0 +1,39 @@ +package run + +import ( + "fmt" + "sync/atomic" + + "go.uber.org/zap" +) + +type StartupProgressLogger struct { + shardsCompleted atomic.Uint64 + shardsTotal atomic.Uint64 + logger *zap.Logger +} + +func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger { + return &StartupProgressLogger{ + logger: logger, + } +} + +func (s *StartupProgressLogger) AddShard() { + s.shardsTotal.Add(1) +} + +func (s *StartupProgressLogger) RemoveShardFromCount() { + if s.shardsTotal.Load() > 0 { + old, newUint := s.shardsTotal.Load(), s.shardsTotal.Load()-1 + s.shardsTotal.CompareAndSwap(old, newUint) + } +} + +func (s *StartupProgressLogger) CompletedShard() { + shardsCompleted := s.shardsCompleted.Add(1) + totalShards := s.shardsTotal.Load() + + percentShards := float64(shardsCompleted) / float64(totalShards) * 100 + s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards)) +} diff --git a/tsdb/store.go b/tsdb/store.go index 74ad52c36b7..80be8c75d3d 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -54,6 +54,12 @@ const SeriesFileDirectory = "_series" // databaseState keeps track of the state of a database. type databaseState struct{ indexTypes map[string]int } +// res holds the result from opening each shard in a goroutine +type res struct { + s *Shard + err error +} + // addIndexType records that the database has a shard with the given index type. func (d *databaseState) addIndexType(indexType string) { if d.indexTypes == nil { @@ -135,6 +141,12 @@ type Store struct { baseLogger *zap.Logger Logger *zap.Logger + startupProgressMetrics interface { + AddShard() + RemoveShardFromCount() + CompletedShard() + } + closing chan struct{} wg sync.WaitGroup opened bool @@ -167,6 +179,14 @@ func (s *Store) WithLogger(log *zap.Logger) { } } +func (s *Store) WithStartupMetrics(sp interface { + AddShard() + RemoveShardFromCount() + CompletedShard() +}) { + s.startupProgressMetrics = sp +} + // Statistics returns statistics for period monitoring. func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() @@ -310,12 +330,6 @@ func (s *Store) Open() error { } func (s *Store) loadShards() error { - // res holds the result from opening each shard in a goroutine - type res struct { - s *Shard - err error - } - // Limit the number of concurrent TSM files to be opened to the number of cores. s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) @@ -363,9 +377,9 @@ func (s *Store) loadShards() error { log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open") defer logEnd() + var shardLoaderWg sync.WaitGroup t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) - var n int // Determine how many shards we need to open by checking the store path. dbDirs, err := os.ReadDir(s.path) @@ -373,126 +387,37 @@ func (s *Store) loadShards() error { return err } - for _, db := range dbDirs { - dbPath := filepath.Join(s.path, db.Name()) - if !db.IsDir() { - log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) - continue - } - - if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { - log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) - continue - } - - // Load series file. - sfile, err := s.openSeriesFile(db.Name()) - if err != nil { - return err - } + shardCtx := &ShardContext{ + resC: resC, + wg: &shardLoaderWg, + t: t, + log: log, + } - // Retrieve database index. - idx, err := s.createIndexIfNotExists(db.Name()) + if s.startupProgressMetrics != nil { + err := s.traverseShardsAndProcess(func(ctx *ShardContext) error { + s.startupProgressMetrics.AddShard() + return nil + }, dbDirs, shardCtx) if err != nil { return err } + } - // Load each retention policy within the database directory. - rpDirs, err := os.ReadDir(dbPath) + err = s.traverseShardsAndProcess(func(ctx *ShardContext) error { + err := s.loadAllShards(ctx) if err != nil { return err } + return nil + }, dbDirs, shardCtx) - for _, rp := range rpDirs { - rpPath := filepath.Join(s.path, db.Name(), rp.Name()) - if !rp.IsDir() { - log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) - continue - } - - // The .series directory is not a retention policy. - if rp.Name() == SeriesFileDirectory { - continue - } - - if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) { - log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter")) - continue - } - - shardDirs, err := os.ReadDir(rpPath) - if err != nil { - return err - } - - for _, sh := range shardDirs { - // Series file should not be in a retention policy but skip just in case. - if sh.Name() == SeriesFileDirectory { - log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) - continue - } - - n++ - go func(db, rp, sh string) { - t.Take() - defer t.Release() - - start := time.Now() - path := filepath.Join(s.path, db, rp, sh) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) - - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh, 10, 64) - if err != nil { - log.Info("invalid shard ID found at path", zap.String("path", path)) - resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} - return - } - - if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { - log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) - resC <- &res{} - return - } - - // Copy options and assign shared index. - opt := s.EngineOptions - opt.InmemIndex = idx - - // Provide an implementation of the ShardIDSets - opt.SeriesIDSets = shardSet{store: s, db: db} - - // Existing shards should continue to use inmem index. - if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { - opt.IndexVersion = InmemIndexName - } - - // Open engine. - shard := NewShard(shardID, path, walPath, sfile, opt) - - // Disable compactions, writes and queries until all shards are loaded - shard.EnableOnOpen = false - shard.CompactionDisabled = s.EngineOptions.CompactionDisabled - shard.WithLogger(s.baseLogger) - - err = s.OpenShard(shard, false) - if err != nil { - log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) - resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} - return - } - - resC <- &res{s: shard} - log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) - }(db.Name(), rp.Name(), sh.Name()) - } - } - } + go func() { + shardLoaderWg.Wait() + close(resC) + }() - // Gather results of opening shards concurrently, keeping track of how - // many databases we are managing. - for i := 0; i < n; i++ { - res := <-resC + for res := range resC { if res.s == nil || res.err != nil { continue } @@ -503,7 +428,6 @@ func (s *Store) loadShards() error { } s.databases[res.s.database].addIndexType(res.s.IndexType()) } - close(resC) // Check if any databases are running multiple index types. for db, state := range s.databases { @@ -2367,3 +2291,192 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error { } return nil } + +func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + dbPath := filepath.Join(s.path, db.Name()) + if !db.IsDir() { + log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { + log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) + return nil, nil + } + + // Load each retention policy within the database directory. + rpDirs, err := os.ReadDir(dbPath) + if err != nil { + return nil, err + } + + return rpDirs, nil +} + +func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name()) + if !rpDir.IsDir() { + log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + // The .series directory is not a retention policy. + if rpDir.Name() == SeriesFileDirectory { + return nil, nil + } + + if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) { + log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter")) + return nil, nil + } + + shardDirs, err := os.ReadDir(rpPath) + if err != nil { + return nil, err + } + + return shardDirs, nil +} + +type ShardContext struct { + db os.DirEntry + rp os.DirEntry + sh os.DirEntry + sFile *SeriesFile + resC chan *res + idx interface{} + t limiter.Fixed + wg *sync.WaitGroup + log *zap.Logger +} + +func (s *Store) traverseShardsAndProcess(fn func(ctx *ShardContext) error, dbDirs []os.DirEntry, sharedContext *ShardContext) error { + for _, db := range dbDirs { + rpDirs, err := s.getRetentionPolicyDirs(db, sharedContext.log) + if err != nil { + return err + } else if rpDirs == nil { + continue + } + + // Load series file. + sfile, err := s.openSeriesFile(db.Name()) + if err != nil { + return err + } + + // Retrieve database index. + idx, err := s.createIndexIfNotExists(db.Name()) + if err != nil { + return err + } + + for _, rp := range rpDirs { + shardDirs, err := s.getShards(rp, db, sharedContext.log) + if err != nil { + return err + } else if shardDirs == nil { + continue + } + + for _, sh := range shardDirs { + // Series file should not be in a retention policy but skip just in case. + if sh.Name() == SeriesFileDirectory { + sharedContext.log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) + continue + } + + ctx := &ShardContext{ + db: db, + rp: rp, + sh: sh, + wg: sharedContext.wg, + sFile: sfile, + idx: idx, + resC: sharedContext.resC, + t: sharedContext.t, + log: sharedContext.log, + } + + if err := fn(ctx); err != nil { + return err + } + } + } + } + + return nil +} + +func (s *Store) loadAllShards(opts *ShardContext) error { + opts.wg.Add(1) + + go func(db, rp, sh string) { + defer opts.wg.Done() + + opts.t.Take() + defer opts.t.Release() + + start := time.Now() + path := filepath.Join(s.path, db, rp, sh) + walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh, 10, 64) + if err != nil { + opts.log.Info("invalid shard ID found at path", zap.String("path", path)) + opts.resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.RemoveShardFromCount() + } + return + } + + if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { + opts.log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) + opts.resC <- &res{} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.RemoveShardFromCount() + } + return + } + + // Copy options and assign shared index. + opt := s.EngineOptions + opt.InmemIndex = opts.idx + + // Provide an implementation of the ShardIDSets + opt.SeriesIDSets = shardSet{store: s, db: db} + + // Existing shards should continue to use inmem index. + if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { + opt.IndexVersion = InmemIndexName + } + + // Open engine. + shard := NewShard(shardID, path, walPath, opts.sFile, opt) + + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false + shard.CompactionDisabled = s.EngineOptions.CompactionDisabled + shard.WithLogger(s.baseLogger) + + err = s.OpenShard(shard, false) + if err != nil { + opts.log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + opts.resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.RemoveShardFromCount() + } + return + } + + opts.resC <- &res{s: shard} + opts.log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + }(opts.db.Name(), opts.rp.Name(), opts.sh.Name()) + + return nil +} diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 3c2010db513..04f75fa7415 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -14,6 +14,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "testing" "time" @@ -144,6 +145,64 @@ func TestStore_CreateShard(t *testing.T) { } } +// Ensure the store can create a new shard. +func TestStore_StartupShardProgress(t *testing.T) { + t.Parallel() + + test := func(index string) { + fmt.Println(index) + s := MustOpenStore(index) + defer s.Close() + + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + // Create another shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { + t.Fatal(err) + } else if sh := s.Shard(2); sh == nil { + t.Fatalf("expected shard") + } + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + if err := s.ReopenWithStartupMetrics(msl); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard(1)") + } else if sh = s.Shard(2); sh == nil { + t.Fatalf("expected shard(2)") + } + + if msl.getShardsAdded() != 2 { + t.Fatalf("expected 2 shards added, got %d", msl.getShardsAdded()) + } + + if msl.getShardsCompleted() != 2 { + t.Fatalf("expected 2 shards completed, got %d", msl.getShardsCompleted()) + } + + // Equality check to make sure shards are always added prior to + // completion being called. + reflect.DeepEqual(msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + func TestStore_BadShard(t *testing.T) { const errStr = "a shard open error" indexes := tsdb.RegisteredIndexes() @@ -2682,6 +2741,25 @@ func (s *Store) Reopen() error { return s.Store.Open() } +// Reopen closes and reopens the store as a new store. +func (s *Store) ReopenWithStartupMetrics(msl *mockStartupLogger) error { + if err := s.Store.Close(); err != nil { + return err + } + + s.Store = tsdb.NewStore(s.Path()) + s.EngineOptions.IndexVersion = s.index + s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") + s.EngineOptions.Config.TraceLoggingEnabled = true + + s.WithStartupMetrics(msl) + + if testing.Verbose() { + s.WithLogger(logger.New(os.Stdout)) + } + return s.Store.Open() +} + // Close closes the store and removes the underlying data. func (s *Store) Close() error { defer os.RemoveAll(s.Path()) @@ -2754,3 +2832,37 @@ func dirExists(path string) bool { } return !os.IsNotExist(err) } + +type mockStartupLogger struct { + shardTracker []string + mu sync.Mutex + shardsCompletedCalled atomic.Uint64 + shardsAddedCalled atomic.Uint64 +} + +func (m *mockStartupLogger) AddShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add")) + m.mu.Unlock() + m.shardsAddedCalled.Add(1) +} +func (m *mockStartupLogger) CompletedShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete")) + m.mu.Unlock() + m.shardsCompletedCalled.Add(1) +} +func (m *mockStartupLogger) RemoveShardFromCount() { + if m.shardsAddedCalled.Load() > 0 { + old, newUint := m.shardsAddedCalled.Load(), m.shardsAddedCalled.Load()-1 + m.shardsAddedCalled.CompareAndSwap(old, newUint) + } +} + +func (m *mockStartupLogger) getShardsAdded() uint64 { + return m.shardsAddedCalled.Load() +} + +func (m *mockStartupLogger) getShardsCompleted() uint64 { + return m.shardsCompletedCalled.Load() +}