Skip to content

Commit

Permalink
Add logs when replication stream is paused due to flow control (#6254)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
1. Add log for replication stream flow control
2. Change backpressure default threshold  from 50 to 500
## Why?
<!-- Tell your future self why have you made these changes -->
1. To get more visibility when back pressure kicked in
2. To let back pressure happened less often
## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
n/a
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
no risk, gated by a dc.
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
yes

---------

Co-authored-by: Lanie Hei <[email protected]>
  • Loading branch information
xwduan and laniehei authored Jul 9, 2024
1 parent 7779fd7 commit 6a11122
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 6 deletions.
2 changes: 1 addition & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2137,7 +2137,7 @@ that task will be sent to DLQ.`,
)
ReplicationReceiverMaxOutstandingTaskCount = NewGlobalIntSetting(
"history.ReplicationReceiverMaxOutstandingTaskCount",
50,
500,
`Maximum number of outstanding tasks allowed for a single shard in the stream receiver`,
)
ReplicationResendMaxBatchCount = NewGlobalIntSetting(
Expand Down
12 changes: 10 additions & 2 deletions service/history/replication/stream_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,23 @@ func (r *StreamReceiverImpl) ackMessage(
r.logger.Warn("Tiered stack mode. Have to wait for both high and low priority tracker received at least one batch of tasks before acking.")
return 0, nil
}
highPriorityFlowControlCommand := r.flowController.GetFlowControlInfo(enums.TASK_PRIORITY_HIGH)
if highPriorityFlowControlCommand == enums.REPLICATION_FLOW_CONTROL_COMMAND_PAUSE {
r.logger.Warn(fmt.Sprintf("pausing High Priority Tasks, current size: %v, lowWatermark: %v", r.highPriorityTaskTracker.Size(), highPriorityWaterMarkInfo.Watermark))
}
highPriorityWatermark = &replicationpb.ReplicationState{
InclusiveLowWatermark: highPriorityWaterMarkInfo.Watermark,
InclusiveLowWatermarkTime: timestamppb.New(highPriorityWaterMarkInfo.Timestamp),
FlowControlCommand: r.flowController.GetFlowControlInfo(enums.TASK_PRIORITY_HIGH),
FlowControlCommand: highPriorityFlowControlCommand,
}
lowPriorityFlowControlCommand := r.flowController.GetFlowControlInfo(enums.TASK_PRIORITY_LOW)
if lowPriorityFlowControlCommand == enums.REPLICATION_FLOW_CONTROL_COMMAND_PAUSE {
r.logger.Warn(fmt.Sprintf("pausing Low Priority Tasks, current size: %v, lowWatermark: %v", r.lowPriorityTaskTracker.Size(), lowPriorityWaterMarkInfo.Watermark))
}
lowPriorityWatermark = &replicationpb.ReplicationState{
InclusiveLowWatermark: lowPriorityWaterMarkInfo.Watermark,
InclusiveLowWatermarkTime: timestamppb.New(lowPriorityWaterMarkInfo.Timestamp),
FlowControlCommand: r.flowController.GetFlowControlInfo(enums.TASK_PRIORITY_LOW),
FlowControlCommand: lowPriorityFlowControlCommand,
}
if highPriorityWaterMarkInfo.Watermark <= lowPriorityWaterMarkInfo.Watermark {
inclusiveLowWaterMark = highPriorityWaterMarkInfo.Watermark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (f *flowControlTestSuite) SetupTest() {
}

f.config = tests.NewDynamicConfig()
f.config.ReplicationReceiverMaxOutstandingTaskCount = func() int {
return 50
}
f.controller = NewReceiverFlowControl(signals, f.config)
f.maxOutStandingTasks = f.config.ReplicationReceiverMaxOutstandingTaskCount()
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/stream_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func (s *streamReceiverSuite) TestAckMessage_SyncStatus_ReceiverModeTieredStack(
s.lowPriorityTaskTracker.EXPECT().LowWatermark().Return(lowWatermarkInfo)
s.receiverFlowController.EXPECT().GetFlowControlInfo(enumsspb.TASK_PRIORITY_HIGH).Return(enumsspb.REPLICATION_FLOW_CONTROL_COMMAND_RESUME)
s.receiverFlowController.EXPECT().GetFlowControlInfo(enumsspb.TASK_PRIORITY_LOW).Return(enumsspb.REPLICATION_FLOW_CONTROL_COMMAND_PAUSE)
s.highPriorityTaskTracker.EXPECT().Size().Return(0)
s.lowPriorityTaskTracker.EXPECT().Size().Return(0)
s.highPriorityTaskTracker.EXPECT().Size().Return(0).AnyTimes()
s.lowPriorityTaskTracker.EXPECT().Size().Return(0).AnyTimes()
_, err := s.streamReceiver.ackMessage(s.stream)
s.NoError(err)
s.Equal([]*adminservice.StreamWorkflowReplicationMessagesRequest{{
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/stream_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewStreamSender(
shardContext.GetLogger(),
tag.TargetCluster(clientClusterName), // client is the target cluster (passive cluster)
tag.TargetShardID(clientShardKey.ShardID),
tag.ShardID(serverShardKey.ShardID), // server is the source cluster (active cluster)
)
return &StreamSenderImpl{
server: server,
Expand Down
8 changes: 7 additions & 1 deletion service/history/replication/stream_sender_flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ package replication

import (
"context"
"fmt"
"sync"
"time"

"go.temporal.io/server/api/enums/v1"
replicationpb "go.temporal.io/server/api/replication/v1"
Expand Down Expand Up @@ -112,7 +114,9 @@ func (s *SenderFlowControllerImpl) setState(state *flowControlState, flowControl
func (s *SenderFlowControllerImpl) Wait(priority enums.TaskPriority) {
state, ok := s.flowControlStates[priority]
waitForRateLimiter := func(rateLimiter quotas.RateLimiter) {
err := rateLimiter.Wait(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) // to avoid infinite wait
defer cancel()
err := rateLimiter.Wait(ctx)
if err != nil {
s.logger.Error("error waiting for rate limiter", tag.Error(err))
}
Expand All @@ -126,7 +130,9 @@ func (s *SenderFlowControllerImpl) Wait(priority enums.TaskPriority) {
state.mu.Lock()
if !state.resume {
state.waiters++
s.logger.Info(fmt.Sprintf("%v sender is paused", priority.String()))
state.cond.Wait()
s.logger.Info(fmt.Sprintf("%s sender is resumed", priority.String()))
state.waiters--
}
state.mu.Unlock()
Expand Down

0 comments on commit 6a11122

Please sign in to comment.