diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 8d7baba7..fa96cbc2 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -448,18 +448,20 @@ class TLogManager::TImpl { ShutdownRequested_.store(true); + auto config = Config_.Acquire(); + if (LoggingThread_->GetThreadId() == GetCurrentThreadId()) { FlushWriters(); } else { // Wait for all previously enqueued messages to be flushed // but no more than ShutdownGraceTimeout to prevent hanging. - Synchronize(TInstant::Now() + Config_->ShutdownGraceTimeout); + Synchronize(TInstant::Now() + config->ShutdownGraceTimeout); } // For now this is the only way to wait for log writers that perform asynchronous flushes. // TODO(achulkov2): Refactor log manager to support asynchronous operations. - if (Config_->ShutdownBusyTimeout) { - Sleep(Config_->ShutdownBusyTimeout); + if (config->ShutdownBusyTimeout != TDuration::Zero()) { + Sleep(config->ShutdownBusyTimeout); } EventQueue_->Shutdown(); @@ -486,13 +488,14 @@ class TLogManager::TImpl } auto guard = Guard(SpinLock_); + auto config = Config_.Acquire(); auto it = NameToCategory_.find(categoryName); if (it == NameToCategory_.end()) { auto category = std::make_unique(); category->Name = categoryName; category->ActualVersion = &Version_; it = NameToCategory_.emplace(categoryName, std::move(category)).first; - DoUpdateCategory(it->second.get()); + DoUpdateCategory(config, it->second.get()); } return it->second.get(); } @@ -500,14 +503,17 @@ class TLogManager::TImpl void UpdateCategory(TLoggingCategory* category) { auto guard = Guard(SpinLock_); - DoUpdateCategory(category); + auto config = Config_.Acquire(); + DoUpdateCategory(config, category); } void UpdateAnchor(TLoggingAnchor* anchor) { auto guard = Guard(SpinLock_); + auto config = Config_.Acquire(); + bool enabled = true; - for (const auto& prefix : Config_->SuppressedMessages) { + for (const auto& prefix : config->SuppressedMessages) { if (anchor->AnchorMessage.StartsWith(prefix)) { enabled = false; break; @@ -645,11 +651,9 @@ class TLogManager::TImpl void SuppressRequest(TRequestId requestId) { - if (!RequestSuppressionEnabled_) { - return; + if (RequestSuppressionEnabled_.load(std::memory_order_relaxed)) { + SuppressedRequestIdQueue_.Enqueue(requestId); } - - SuppressedRequestIdQueue_.Enqueue(requestId); } void Synchronize(TInstant deadline = TInstant::Max()) @@ -704,6 +708,8 @@ class TLogManager::TImpl void EnsureStarted() { + VERIFY_THREAD_AFFINITY_ANY(); + std::call_once(Started_, [&] { if (LoggingThread_->IsStopping()) { return; @@ -721,7 +727,9 @@ class TLogManager::TImpl }); } - const std::vector& GetWriters(const TLogEvent& event) + const std::vector& GetWriters( + const TLogManagerConfigPtr& config, + const TLogEvent& event) { VERIFY_THREAD_AFFINITY(LoggingThread); @@ -736,7 +744,7 @@ class TLogManager::TImpl } THashSet writerNames; - for (const auto& rule : Config_->Rules) { + for (const auto& rule : config->Rules) { if (rule->IsApplicable(event.Category->Name, event.Level, event.Family)) { writerNames.insert(rule->Writers.begin(), rule->Writers.end()); } @@ -783,10 +791,7 @@ class TLogManager::TImpl return; } - AbortOnAlert_.store(event.Config->AbortOnAlert); - EnsureStarted(); - FlushWriters(); try { @@ -797,8 +802,10 @@ class TLogManager::TImpl } } - std::unique_ptr CreateFormatter(const TLogWriterConfigPtr& writerConfig) + static std::unique_ptr CreateFormatter(const TLogWriterConfigPtr& writerConfig) { + VERIFY_THREAD_AFFINITY_ANY(); + switch (writerConfig->Format) { case ELogFormat::PlainText: return std::make_unique( @@ -821,7 +828,10 @@ class TLogManager::TImpl void DoUpdateConfig(const TLogManagerConfigPtr& config, bool fromEnv) { - if (AreNodesEqual(ConvertToNode(Config_), ConvertToNode(config))) { + // This could be called both from ctor and from LoggingThread. + + auto oldConfig = Config_.Acquire(); + if (AreNodesEqual(ConvertToNode(oldConfig), ConvertToNode(config))) { return; } @@ -876,31 +886,36 @@ class TLogManager::TImpl category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed); } - Config_ = config; ConfiguredFromEnv_.store(fromEnv); - HighBacklogWatermark_.store(Config_->HighBacklogWatermark); - LowBacklogWatermark_.store(Config_->LowBacklogWatermark); - RequestSuppressionEnabled_.store(Config_->RequestSuppressionTimeout != TDuration::Zero()); + HighBacklogWatermark_.store(config->HighBacklogWatermark); + LowBacklogWatermark_.store(config->LowBacklogWatermark); + RequestSuppressionEnabled_.store(config->RequestSuppressionTimeout != TDuration::Zero()); + AbortOnAlert_.store(config->AbortOnAlert); - CompressionThreadPool_->Configure(Config_->CompressionThreadCount); + CompressionThreadPool_->Configure(config->CompressionThreadCount); if (RequestSuppressionEnabled_) { - SuppressedRequestIdSet_.SetTtl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2); + SuppressedRequestIdSet_.SetTtl((config->RequestSuppressionTimeout + DequeuePeriod) * 2); } else { SuppressedRequestIdSet_.Clear(); SuppressedRequestIdQueue_.DequeueAll(); } - FlushExecutor_->SetPeriod(Config_->FlushPeriod); - WatchExecutor_->SetPeriod(Config_->WatchPeriod); - CheckSpaceExecutor_->SetPeriod(Config_->CheckSpacePeriod); - FileRotationExecutor_->SetPeriod(Config_->RotationCheckPeriod); + FlushExecutor_->SetPeriod(config->FlushPeriod); + WatchExecutor_->SetPeriod(config->WatchPeriod); + CheckSpaceExecutor_->SetPeriod(config->CheckSpacePeriod); + FileRotationExecutor_->SetPeriod(config->RotationCheckPeriod); + Config_.Store(std::move(config)); Version_++; } - void WriteEvent(const TLogEvent& event) + void WriteEvent( + const TLogManagerConfigPtr& config, + const TLogEvent& event) { + VERIFY_THREAD_AFFINITY(LoggingThread); + if (ReopenRequested_.exchange(false)) { ReloadWriters(); } @@ -912,21 +927,26 @@ class TLogManager::TImpl event.Anchor->ByteCounter.Current += std::ssize(event.MessageRef); } - for (const auto& writer : GetWriters(event)) { + for (const auto& writer : GetWriters(config, event)) { writer->Write(event); } } void FlushWriters() { + VERIFY_THREAD_AFFINITY(LoggingThread); + for (const auto& [name, writer] : NameToWriter_) { writer->Flush(); } + FlushedEvents_ = WrittenEvents_.load(); } void RotateFiles() { + VERIFY_THREAD_AFFINITY(LoggingThread); + for (const auto& [name, writer] : NameToWriter_) { if (auto fileWriter = DynamicPointerCast(writer)) { fileWriter->MaybeRotate(); @@ -936,6 +956,8 @@ class TLogManager::TImpl void ReloadWriters() { + VERIFY_THREAD_AFFINITY(LoggingThread); + Version_++; for (const auto& [name, writer] : NameToWriter_) { writer->Reload(); @@ -944,15 +966,20 @@ class TLogManager::TImpl void CheckSpace() { + VERIFY_THREAD_AFFINITY(LoggingThread); + + auto config = Config_.Acquire(); for (const auto& [name, writer] : NameToWriter_) { if (auto fileWriter = DynamicPointerCast(writer)) { - fileWriter->CheckSpace(Config_->MinDiskSpace); + fileWriter->CheckSpace(config->MinDiskSpace); } } } void RegisterNotificatonWatch(TNotificationWatch* watch) { + VERIFY_THREAD_AFFINITY(LoggingThread); + if (watch->IsValid()) { // Watch can fail to initialize if the writer is disabled // e.g. due to the lack of space. @@ -1004,6 +1031,8 @@ class TLogManager::TImpl void PushEvent(TLoggerQueueItem&& event) { + VERIFY_THREAD_AFFINITY_ANY(); + auto& perThreadQueue = PerThreadQueue(); if (!perThreadQueue) { perThreadQueue = new TThreadLocalQueue(); @@ -1020,9 +1049,10 @@ class TLogManager::TImpl const TCounter& GetWrittenEventsCounter(const TLogEvent& event) { + VERIFY_THREAD_AFFINITY_ANY(); + auto key = std::pair(event.Category->Name, event.Level); auto it = WrittenEventsCounters_.find(key); - if (it == WrittenEventsCounters_.end()) { // TODO(prime@): optimize sensor count auto counter = Profiler @@ -1038,6 +1068,8 @@ class TLogManager::TImpl void CollectSensors(ISensorWriter* writer) override { + VERIFY_THREAD_AFFINITY_ANY(); + auto writtenEvents = WrittenEvents_.load(); auto enqueuedEvents = EnqueuedEvents_.load(); auto suppressedEvents = SuppressedEvents_.load(); @@ -1053,6 +1085,8 @@ class TLogManager::TImpl void OnDiskProfiling() { + VERIFY_THREAD_AFFINITY(LoggingThread); + try { auto minLogStorageAvailableSpace = std::numeric_limits::max(); auto minLogStorageFreeSpace = std::numeric_limits::max(); @@ -1085,6 +1119,8 @@ class TLogManager::TImpl std::vector CaptureAnchorStats() { + VERIFY_THREAD_AFFINITY(LoggingThread); + auto now = TInstant::Now(); auto deltaSeconds = (now - LastAnchorStatsCaptureTime_).SecondsFloat(); LastAnchorStatsCaptureTime_ = now; @@ -1112,14 +1148,17 @@ class TLogManager::TImpl void OnAnchorProfiling() { - if (Config_->EnableAnchorProfiling && !AnchorBufferedProducer_) { + VERIFY_THREAD_AFFINITY(LoggingThread); + + auto config = Config_.Acquire(); + if (config->EnableAnchorProfiling && !AnchorBufferedProducer_) { AnchorBufferedProducer_ = New(); Profiler .WithSparse() .WithDefaultDisabled() .WithProducerRemoveSupport() .AddProducer("/anchors", AnchorBufferedProducer_); - } else if (!Config_->EnableAnchorProfiling && AnchorBufferedProducer_) { + } else if (!config->EnableAnchorProfiling && AnchorBufferedProducer_) { AnchorBufferedProducer_.Reset(); } @@ -1131,7 +1170,7 @@ class TLogManager::TImpl TSensorBuffer sensorBuffer; for (const auto& stat : stats) { - if (stat.MessageRate < Config_->MinLoggedMessageRateToProfile) { + if (stat.MessageRate < config->MinLoggedMessageRateToProfile) { continue; } TWithTagGuard tagGuard(&sensorBuffer, "message", stat.Anchor->AnchorMessage); @@ -1268,20 +1307,24 @@ class TLogManager::TImpl WrittenEvents_ += eventsWritten; - if (!Config_->FlushPeriod || ShutdownRequested_) { + auto config = Config_.Acquire(); + if (!config->FlushPeriod || ShutdownRequested_) { FlushWriters(); } } int ProcessTimeOrderedBuffer() { + VERIFY_THREAD_AFFINITY(LoggingThread); + int eventsWritten = 0; int eventsSuppressed = 0; SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll()); auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed); - auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout); + auto config = Config_.Acquire(); + auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(config->RequestSuppressionTimeout); while (!TimeOrderedBuffer_.empty()) { const auto& event = TimeOrderedBuffer_.front(); @@ -1300,7 +1343,7 @@ class TLogManager::TImpl if (requestSuppressionEnabled && event.RequestId && SuppressedRequestIdSet_.Contains(event.RequestId)) { ++eventsSuppressed; } else { - WriteEvent(event); + WriteEvent(config, event); } }); @@ -1312,10 +1355,14 @@ class TLogManager::TImpl return eventsWritten; } - void DoUpdateCategory(TLoggingCategory* category) + void DoUpdateCategory( + const TLogManagerConfigPtr& config, + TLoggingCategory* category) { + VERIFY_THREAD_AFFINITY_ANY(); + auto minPlainTextLevel = ELogLevel::Maximum; - for (const auto& rule : Config_->Rules) { + for (const auto& rule : config->Rules) { if (rule->IsApplicable(category->Name, ELogFamily::PlainText)) { minPlainTextLevel = std::min(minPlainTextLevel, rule->MinLevel); } @@ -1323,11 +1370,13 @@ class TLogManager::TImpl category->MinPlainTextLevel.store(minPlainTextLevel, std::memory_order::relaxed); category->CurrentVersion.store(GetVersion(), std::memory_order::relaxed); - category->StructuredValidationSamplingRate.store(Config_->StructuredValidationSamplingRate, std::memory_order::relaxed); + category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed); } void DoRegisterAnchor(TLoggingAnchor* anchor) { + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + // NB: Duplicates are not desirable but possible. AnchorMap_.emplace(anchor->AnchorMessage, anchor); anchor->NextAnchor = FirstAnchor_; @@ -1357,21 +1406,23 @@ class TLogManager::TImpl DECLARE_THREAD_AFFINITY_SLOT(LoggingThread); - // Configuration. + TAtomicIntrusivePtr Config_; + + // Protects the section of members below. NThreading::TForkAwareSpinLock SpinLock_; - // Version forces this very module's Logger object to update to our own - // default configuration (default level etc.). - std::atomic Version_ = 0; - std::atomic AbortOnAlert_ = false; - TLogManagerConfigPtr Config_; - std::atomic ConfiguredFromEnv_ = false; THashMap> NameToCategory_; THashMap TypeNameToWriterFactory_; - const TLoggingCategory* SystemCategory_; - // These are just copies from Config_. + + // Incrementing version forces loggers to update their own default configuration (default level etc.). + std::atomic Version_ = 0; + + std::atomic ConfiguredFromEnv_ = false; + + // These are just cached (for performance reason) copies from Config_. // The values are being read from arbitrary threads but stale values are fine. std::atomic HighBacklogWatermark_ = Max(); std::atomic LowBacklogWatermark_ = Max(); + std::atomic AbortOnAlert_ = false; std::atomic InitializationStarted_ = false; std::atomic InitializerThreadId_ = NThreading::InvalidThreadId; @@ -1409,7 +1460,9 @@ class TLogManager::TImpl THashMap NameToWriter_; THashMap> KeyToCachedWriter_; + const std::vector SystemWriters_; + const TLoggingCategory* SystemCategory_; std::atomic ReopenRequested_ = false; std::atomic ShutdownRequested_ = false;