Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logging): Add startup logging for shard counts #25378

Open
wants to merge 13 commits into
base: master-1.x
Choose a base branch
from
4 changes: 4 additions & 0 deletions cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error {
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile

sl := NewStartupProgressLogger(s.Logger)
s.SetStartupMetrics(sl)

if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
Expand Down
14 changes: 14 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type BuildInfo struct {
Time string
}

type StartupProgress interface {
AddShard()
CompletedShard()
}

// Server represents a container for the metadata and storage data and services.
// It is built using a Config and it manages the startup and shutdown of all
// services in the proper order.
Expand Down Expand Up @@ -96,6 +101,8 @@ type Server struct {

Monitor *monitor.Monitor

StartupProgressMetrics StartupProgress

// Server reporting and registration
reportingDisabled bool

Expand Down Expand Up @@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) {
s.MuxLogger = tcp.MuxLogger(w)
}

func (s *Server) SetStartupMetrics(sp StartupProgress) {
s.StartupProgressMetrics = sp
}

func (s *Server) appendMonitorService() {
s.Services = append(s.Services, s.Monitor)
}
Expand Down Expand Up @@ -465,6 +476,9 @@ func (s *Server) Open() error {
s.MetaClient.WithLogger(s.Logger)
}
s.TSDBStore.WithLogger(s.Logger)

s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics)

if s.config.Data.QueryLogEnabled {
s.QueryExecutor.WithLogger(s.Logger)
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
Expand Down
32 changes: 32 additions & 0 deletions cmd/influxd/run/startup_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package run

import (
"fmt"
"sync/atomic"

"go.uber.org/zap"
)

type StartupProgressLogger struct {
shardsCompleted atomic.Uint64
shardsTotal atomic.Uint64
logger *zap.Logger
}

func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
return &StartupProgressLogger{
logger: logger,
}
}

func (s *StartupProgressLogger) AddShard() {
s.shardsTotal.Add(1)
}

func (s *StartupProgressLogger) CompletedShard() {
shardsCompleted := s.shardsCompleted.Add(1)
totalShards := s.shardsTotal.Load()

percentShards := float64(shardsCompleted) / float64(totalShards) * 100
s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards))
}
304 changes: 304 additions & 0 deletions tsdb/load_shards.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
package tsdb

import (
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/limiter"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// res holds the result from opening each shard in a goroutine
type res struct {
s *Shard
err error
}

func (s *Store) loadShards() error {
// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
devanbenz marked this conversation as resolved.
Show resolved Hide resolved

// Setup a shared limiter for compactions
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions

if lim < 1 {
lim = 1
}
}

// Don't allow more compactions to run than cores.
if lim > runtime.GOMAXPROCS(0) {
lim = runtime.GOMAXPROCS(0)
}

s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)

compactionSettings := []zapcore.Field{zap.Int("max_concurrent_compactions", lim)}
throughput := int(s.EngineOptions.Config.CompactThroughput)
throughputBurst := int(s.EngineOptions.Config.CompactThroughputBurst)
if throughput > 0 {
if throughputBurst < throughput {
throughputBurst = throughput
}

compactionSettings = append(
compactionSettings,
zap.Int("throughput_bytes_per_second", throughput),
zap.Int("throughput_bytes_per_second_burst", throughputBurst),
)
s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(throughput, throughputBurst)
} else {
compactionSettings = append(
compactionSettings,
zap.String("throughput_bytes_per_second", "unlimited"),
zap.String("throughput_bytes_per_second_burst", "unlimited"),
)
}

s.Logger.Info("Compaction settings", compactionSettings...)

log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
defer logEnd()

shardLoaderWg := new(sync.WaitGroup)
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)

// Determine how many shards we need to open by checking the store path.
dbDirs, err := os.ReadDir(s.path)
if err != nil {
return err
}

walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return err
} else if rpDirs == nil {
continue
}

// Load series file.
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}

// Retrieve database index.
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}

for _, rp := range rpDirs {
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return err
} else if shardDirs == nil {
continue
}

for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
continue
}

if err := fn(sfile, idx, sh, db, rp); err != nil {
return err
}
}
}
}

return nil
}

if s.startupProgressMetrics != nil {
err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
s.startupProgressMetrics.AddShard()
return nil
})
if err != nil {
return err
}
}

err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
shardLoaderWg.Add(1)

go func(db, rp, sh string) {
defer shardLoaderWg.Done()

t.Take()
defer t.Release()

start := time.Now()
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)

// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}

if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
resC <- &res{}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}

// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx

// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}

// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = InmemIndexName
}

// Open engine.
shard := NewShard(shardID, path, walPath, sfile, opt)

// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)

err = s.OpenShard(shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}

resC <- &res{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
}(db.Name(), rp.Name(), sh.Name())

return nil
})

if err := s.enableShards(shardLoaderWg, resC); err != nil {
return err
}

return nil
}

func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *res) error {
go func() {
wg.Wait()
close(resC)
}()

for res := range resC {
if res.s == nil || res.err != nil {
continue
}
s.shards[res.s.id] = res.s
s.epochs[res.s.id] = newEpochTracker()
if _, ok := s.databases[res.s.database]; !ok {
s.databases[res.s.database] = new(databaseState)
}
s.databases[res.s.database].addIndexType(res.s.IndexType())
}

// Check if any databases are running multiple index types.
for db, state := range s.databases {
if state.hasMultipleIndexTypes() {
var fields []zapcore.Field
for idx, cnt := range state.indexTypes {
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
}
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
}
}

// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
return err
}
}
}

return nil
}

func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
return nil, nil
}

if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
return nil, nil
}

// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return nil, err
}

return rpDirs, nil
}

func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
if !rpDir.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
return nil, nil
}

// The .series directory is not a retention policy.
if rpDir.Name() == SeriesFileDirectory {
return nil, nil
}

if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
return nil, nil
}

shardDirs, err := os.ReadDir(rpPath)
if err != nil {
return nil, err
}

return shardDirs, nil
}
Loading