Skip to content

Commit

Permalink
feat(tsdb): Small refactor for looping through shards
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Sep 21, 2024
1 parent ed924cb commit 1ce74a8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 37 deletions.
8 changes: 8 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Server struct {

Monitor *monitor.Monitor

StartupProgressMetrics *StartupProgressLogger

// Server reporting and registration
reportingDisabled bool

Expand Down Expand Up @@ -465,6 +467,12 @@ func (s *Server) Open() error {
s.MetaClient.WithLogger(s.Logger)
}
s.TSDBStore.WithLogger(s.Logger)
// TODO: REMOVE THIS
s.config.Logging.StartupProgress = true
if s.config.Logging.StartupProgress {
startupProgressLogger := NewStartupProgressLogger(s.Logger)
s.TSDBStore.WithStartupMetrics(&startupProgressLogger)
}
if s.config.Data.QueryLogEnabled {
s.QueryExecutor.WithLogger(s.Logger)
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type StartupProgressLogger struct {
logger *zap.Logger
}

func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
return &StartupProgressLogger{
func NewStartupProgressLogger(logger *zap.Logger) StartupProgressLogger {
return StartupProgressLogger{
logger: logger,
}
}
Expand Down
7 changes: 4 additions & 3 deletions logger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

// Config represents the configuration for creating a zap.Logger.
type Config struct {
Format string `toml:"format"`
Level zapcore.Level `toml:"level"`
SuppressLogo bool `toml:"suppress-logo"`
Format string `toml:"format"`
Level zapcore.Level `toml:"level"`
SuppressLogo bool `toml:"suppress-logo"`
StartupProgress bool `toml:"startup-progress"`
}

// NewConfig returns a new instance of Config with defaults.
Expand Down
119 changes: 87 additions & 32 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type Store struct {
baseLogger *zap.Logger
Logger *zap.Logger

startupProgressMetrics *StartupProgress
startupProgressMetrics StartupProgress
enableStartupProgress bool

closing chan struct{}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *Store) WithLogger(log *zap.Logger) {
}
}

func (s *Store) WithStartupMetrics(sp *StartupProgress) {
func (s *Store) WithStartupMetrics(sp StartupProgress) {
s.startupProgressMetrics = sp
s.enableStartupProgress = true
}
Expand Down Expand Up @@ -390,15 +390,40 @@ func (s *Store) loadShards() error {
return err
}

for _, db := range dbDirs {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
if s.enableStartupProgress {
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return err
} else if rpDirs == nil {
continue
}

for _, rp := range rpDirs {
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return err
} else if shardDirs == nil {
continue
}

for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
continue
}
s.startupProgressMetrics.AddShard()
}
}
}
}

if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return err
} else if rpDirs == nil {
continue
}

Expand All @@ -414,32 +439,12 @@ func (s *Store) loadShards() error {
return err
}

// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return err
}

for _, rp := range rpDirs {
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
if !rp.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
continue
}

// The .series directory is not a retention policy.
if rp.Name() == SeriesFileDirectory {
continue
}

if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
continue
}

shardDirs, err := os.ReadDir(rpPath)
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return err
} else if shardDirs == nil {
continue
}

for _, sh := range shardDirs {
Expand Down Expand Up @@ -501,6 +506,10 @@ func (s *Store) loadShards() error {

resC <- &res{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))

if s.enableStartupProgress == true {
s.startupProgressMetrics.CompletedShard()
}
}(db.Name(), rp.Name(), sh.Name())
}
}
Expand Down Expand Up @@ -2384,3 +2393,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
}
return nil
}

func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
return nil, nil
}

if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
return nil, nil
}

// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return nil, err
}

return rpDirs, nil
}

func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
if !rpDir.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
return nil, nil
}

// The .series directory is not a retention policy.
if rpDir.Name() == SeriesFileDirectory {
return nil, nil
}

if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
return nil, nil
}

shardDirs, err := os.ReadDir(rpPath)
if err != nil {
return nil, err
}

return shardDirs, nil
}

0 comments on commit 1ce74a8

Please sign in to comment.