Skip to content

Commit

Permalink
Support backup request policy (#2734)
Browse files Browse the repository at this point in the history
* Support backup request policy

* Support Controller::set_backup_request_policy

* Pass Controller to GetBackupRequestMs and update cn/client.md

* Feedback call info

* Avoid to block the timer thread in HandleSocketFailed
  • Loading branch information
chenBright committed Sep 26, 2024
1 parent c727dd7 commit bb284cf
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 13 deletions.
33 changes: 29 additions & 4 deletions docs/cn/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,6 @@ r34717后Controller.has_backup_request()获知是否发送过backup_request。

如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。

工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。

ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。

### 没到超时

超时后RPC会尽快结束。
Expand Down Expand Up @@ -708,6 +704,35 @@ options.retry_policy = &g_my_retry_policy;
- [brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。
- 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。

### backup request

工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。

ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。

用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup request)。 比如根据延时调节backup_request_ms或者根据错误率熔断部分backup request。

ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。

brpc::BackupRequestPolicy接口如下:

```c++
class BackupRequestPolicy {
public:
virtual ~BackupRequestPolicy() = default;

// Return the time in milliseconds in which another request
// will be sent if RPC does not finish.
virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;

// Return true if the backup request should be sent.
virtual bool DoBackup(const Controller* controller) const = 0;

// Called when a rpc is end, user can collect call information to adjust policy.
virtual void OnRPCEnd(const Controller* controller) = 0;
};
```
### 重试应当保守
由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的: 只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
Expand Down
43 changes: 43 additions & 0 deletions src/brpc/backup_request_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef BRPC_BACKUP_REQUEST_POLICY_H
#define BRPC_BACKUP_REQUEST_POLICY_H

#include "brpc/controller.h"

namespace brpc {

class BackupRequestPolicy {
public:
virtual ~BackupRequestPolicy() = default;

// Return the time in milliseconds in which another request
// will be sent if RPC does not finish.
virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;

// Return true if the backup request should be sent.
virtual bool DoBackup(const Controller* controller) const = 0;

// Called when a rpc is end, user can collect call information to adjust policy.
virtual void OnRPCEnd(const Controller* controller) = 0;
};

}

#endif // BRPC_BACKUP_REQUEST_POLICY_H
6 changes: 5 additions & 1 deletion src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ChannelOptions::ChannelOptions()
, log_succeed_without_server(true)
, use_rdma(false)
, auth(NULL)
, backup_request_policy(NULL)
, retry_policy(NULL)
, ns_filter(NULL)
{}
Expand Down Expand Up @@ -495,8 +496,10 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// overriding connect_timeout_ms does not make sense, just use the
// one in ChannelOptions
cntl->_connect_timeout_ms = _options.connect_timeout_ms;
if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) {
if (cntl->backup_request_ms() == UNSET_MAGIC_NUM &&
NULL == cntl->_backup_request_policy) {
cntl->set_backup_request_ms(_options.backup_request_ms);
cntl->_backup_request_policy = _options.backup_request_policy;
}
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
cntl->set_connection_type(_options.connection_type);
Expand Down Expand Up @@ -536,6 +539,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// Currently we cannot handle retry and backup request correctly
cntl->set_max_retry(0);
cntl->set_backup_request_ms(-1);
cntl->_backup_request_policy = NULL;
}

if (cntl->backup_request_ms() >= 0 &&
Expand Down
14 changes: 12 additions & 2 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "brpc/controller.h" // brpc::Controller
#include "brpc/details/profiler_linker.h"
#include "brpc/retry_policy.h"
#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"

namespace brpc {
Expand All @@ -55,11 +56,12 @@ struct ChannelOptions {
int32_t timeout_ms;

// Send another request if RPC does not finish after so many milliseconds.
// Overridable by Controller.set_backup_request_ms().
// Overridable by Controller.set_backup_request_ms() or
// Controller.set_backup_request_policy().
// The request will be sent to a different server by best effort.
// If timeout_ms is set and backup_request_ms >= timeout_ms, backup request
// will never be sent.
// backup request does NOT imply server-side cancelation.
// backup request does NOT imply server-side cancellation.
// Default: -1 (disabled)
// Maximum: 0x7fffffff (roughly 30 days)
int32_t backup_request_ms;
Expand Down Expand Up @@ -112,6 +114,14 @@ struct ChannelOptions {
// Default: NULL
const Authenticator* auth;

// Customize the backup request time and whether to send backup request.
// Priority: `backup_request_policy' > `backup_request_ms'.
// Overridable by Controller.set_backup_request_ms() or
// Controller.set_backup_request_policy().
// This object is NOT owned by channel and should remain valid when channel is used.
// Default: NULL
BackupRequestPolicy* backup_request_policy;

// Customize the error code that should be retried. The interface is
// defined in src/brpc/retry_policy.h
// This object is NOT owned by channel and should remain valid when
Expand Down
28 changes: 28 additions & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ void Controller::ResetPods() {
_connection_type = CONNECTION_TYPE_UNKNOWN;
_timeout_ms = UNSET_MAGIC_NUM;
_backup_request_ms = UNSET_MAGIC_NUM;
_backup_request_policy = NULL;
_connect_timeout_ms = UNSET_MAGIC_NUM;
_real_timeout_ms = UNSET_MAGIC_NUM;
_deadline_us = -1;
Expand Down Expand Up @@ -344,6 +345,16 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) {
}
}

int64_t Controller::backup_request_ms() const {
int timeout_ms = NULL != _backup_request_policy ?
_backup_request_policy->GetBackupRequestMs(this) : _backup_request_ms;
if (timeout_ms > 0x7fffffff) {
timeout_ms = 0x7fffffff;
LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 24 days)";
}
return timeout_ms;
}

void Controller::set_max_retry(int max_retry) {
if (max_retry > MAX_RETRY_COUNT) {
LOG(WARNING) << "Retry count can't be larger than "
Expand Down Expand Up @@ -606,6 +617,13 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
goto END_OF_RPC;
}
if (_error_code == EBACKUPREQUEST) {
if (NULL != _backup_request_policy && !_backup_request_policy->DoBackup(this)) {
// No need to do backup request.
_error_code = saved_error;
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}

// Reset timeout if needed
int rc = 0;
if (timeout_ms() >= 0) {
Expand Down Expand Up @@ -969,6 +987,14 @@ void Controller::EndRPC(const CompletionInfo& info) {
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
}
}

void Controller::OnRPCEnd(int64_t end_time_us) {
_end_time_us = end_time_us;
if (NULL != _backup_request_policy) {
_backup_request_policy->OnRPCEnd(this);
}
}

void Controller::RunDoneInBackupThread(void* arg) {
static_cast<Controller*>(arg)->DoneInBackupThread();
}
Expand Down Expand Up @@ -1313,6 +1339,7 @@ CallId Controller::call_id() {
void Controller::SaveClientSettings(ClientSettings* s) const {
s->timeout_ms = _timeout_ms;
s->backup_request_ms = _backup_request_ms;
s->backup_request_policy = _backup_request_policy;
s->max_retry = _max_retry;
s->tos = _tos;
s->connection_type = _connection_type;
Expand All @@ -1325,6 +1352,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const {
void Controller::ApplyClientSettings(const ClientSettings& s) {
set_timeout_ms(s.timeout_ms);
set_backup_request_ms(s.backup_request_ms);
set_backup_request_policy(s.backup_request_policy);
set_max_retry(s.max_retry);
set_type_of_service(s.tos);
set_connection_type(s.connection_type);
Expand Down
15 changes: 10 additions & 5 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RPCSender;
class StreamSettings;
class MongoContext;
class RetryPolicy;
class BackupRequestPolicy;
class InputMessageBase;
class ThriftStub;
namespace policy {
Expand Down Expand Up @@ -180,7 +181,10 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// Set/get the delay to send backup request in milliseconds. Use
// ChannelOptions.backup_request_ms on unset.
void set_backup_request_ms(int64_t timeout_ms);
int64_t backup_request_ms() const { return _backup_request_ms; }
void set_backup_request_policy(BackupRequestPolicy* policy) {
_backup_request_policy = policy;
}
int64_t backup_request_ms() const;

// Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset.
// <=0 means no retry.
Expand Down Expand Up @@ -670,7 +674,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
struct ClientSettings {
int32_t timeout_ms;
int32_t backup_request_ms;
int max_retry;
BackupRequestPolicy* backup_request_policy;
int max_retry;
int32_t tos;
ConnectionType connection_type;
CompressType request_compress_type;
Expand Down Expand Up @@ -737,9 +742,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
_end_time_us = begin_time_us;
}

void OnRPCEnd(int64_t end_time_us) {
_end_time_us = end_time_us;
}
void OnRPCEnd(int64_t end_time_us);

static void RunDoneInBackupThread(void*);
void DoneInBackupThread();
Expand Down Expand Up @@ -800,6 +803,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
int64_t _real_timeout_ms;
// Deadline of this RPC (since the Epoch in microseconds).
Expand Down
Loading

0 comments on commit bb284cf

Please sign in to comment.