Skip to content

Commit bce3692

Browse files
feng-yclaude
andcommitted
feat(backup_request): add rate-limited backup request policy (#3228)
Add ratio-based rate limiting for backup requests to prevent backup request storms under high QPS or downstream latency spikes. Design: - BackupRequestPolicy interface unchanged (ABI stable) - BackupRateLimiter: standalone statistics module tracking backup/total ratio within a sliding time window using bvar counters - RateLimitedBackupPolicy: internal implementation composing BackupRateLimiter, hidden in .cpp - CreateRateLimitedBackupPolicy() factory function in header - ChannelOptions.backup_request_max_ratio: per-channel configuration Priority: backup_request_policy > backup_request_max_ratio > backup_request_ms - Channel auto-creates internal policy when max_ratio > 0 and no user policy; uses max_ratio > 0 as ownership marker for cleanup - 3 gflags with validators: backup_request_max_ratio, backup_request_ratio_window_size_s, backup_request_ratio_update_interval_s Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent aa784b8 commit bce3692

File tree

5 files changed

+451
-4
lines changed

5 files changed

+451
-4
lines changed

src/brpc/backup_request_policy.cpp

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "brpc/backup_request_policy.h"
19+
20+
#include <new> // std::nothrow
21+
#include <gflags/gflags.h>
22+
#include "butil/logging.h"
23+
#include "brpc/reloadable_flags.h"
24+
#include "bvar/reducer.h"
25+
#include "bvar/window.h"
26+
#include "butil/atomicops.h"
27+
#include "butil/time.h"
28+
29+
namespace brpc {
30+
31+
DEFINE_double(backup_request_max_ratio, -1,
32+
"Maximum ratio of backup requests to total requests. "
33+
"Value in (0, 1] enables rate limiting. Values <= 0 disable it "
34+
"(-1 is default). Can be overridden per-channel via "
35+
"ChannelOptions.backup_request_max_ratio. "
36+
"Note: takes effect at Channel::Init() time; changing this flag "
37+
"at runtime does not affect already-created channels.");
38+
39+
static bool validate_backup_request_max_ratio(const char*, double v) {
40+
if (v <= 0) return true; // non-positive means disabled
41+
if (v <= 1.0) return true;
42+
LOG(ERROR) << "Invalid backup_request_max_ratio=" << v
43+
<< ", must be <= 0 (disabled) or in (0, 1]";
44+
return false;
45+
}
46+
BRPC_VALIDATE_GFLAG(backup_request_max_ratio,
47+
validate_backup_request_max_ratio);
48+
49+
DEFINE_int32(backup_request_ratio_window_size_s, 10,
50+
"Window size in seconds for computing the backup request ratio. "
51+
"Must be in [1, 3600].");
52+
53+
static bool validate_backup_request_ratio_window_size_s(
54+
const char*, int32_t v) {
55+
if (v >= 1 && v <= 3600) return true;
56+
LOG(ERROR) << "Invalid backup_request_ratio_window_size_s=" << v
57+
<< ", must be in [1, 3600]";
58+
return false;
59+
}
60+
BRPC_VALIDATE_GFLAG(backup_request_ratio_window_size_s,
61+
validate_backup_request_ratio_window_size_s);
62+
63+
DEFINE_int32(backup_request_ratio_update_interval_s, 5,
64+
"Interval in seconds between ratio cache updates. Must be >= 1.");
65+
66+
static bool validate_backup_request_ratio_update_interval_s(
67+
const char*, int32_t v) {
68+
if (v >= 1) return true;
69+
LOG(ERROR) << "Invalid backup_request_ratio_update_interval_s=" << v
70+
<< ", must be >= 1";
71+
return false;
72+
}
73+
BRPC_VALIDATE_GFLAG(backup_request_ratio_update_interval_s,
74+
validate_backup_request_ratio_update_interval_s);
75+
76+
// Standalone statistics module for tracking backup/total request ratio
77+
// within a sliding time window. Each instance schedules two bvar::Window
78+
// sampler tasks; keep this in mind for high channel-count deployments.
79+
class BackupRateLimiter {
80+
public:
81+
BackupRateLimiter(double max_backup_ratio,
82+
int window_size_seconds,
83+
int update_interval_seconds)
84+
: _max_backup_ratio(max_backup_ratio)
85+
, _update_interval_us(update_interval_seconds * 1000000LL)
86+
, _total_window(&_total_count, window_size_seconds)
87+
, _backup_window(&_backup_count, window_size_seconds)
88+
, _cached_ratio(0.0)
89+
, _last_update_us(0) {
90+
}
91+
92+
// All atomic operations use relaxed ordering intentionally.
93+
// This is best-effort rate limiting: a slightly stale ratio is
94+
// acceptable for approximate throttling.
95+
bool ShouldAllow() const {
96+
const int64_t now_us = butil::cpuwide_time_us();
97+
int64_t last_us = _last_update_us.load(butil::memory_order_relaxed);
98+
double ratio = _cached_ratio.load(butil::memory_order_relaxed);
99+
100+
if (now_us - last_us >= _update_interval_us) {
101+
if (_last_update_us.compare_exchange_strong(
102+
last_us, now_us, butil::memory_order_relaxed)) {
103+
int64_t total = _total_window.get_value();
104+
int64_t backup = _backup_window.get_value();
105+
ratio = (total > 0) ? static_cast<double>(backup) / total : 0.0;
106+
_cached_ratio.store(ratio, butil::memory_order_relaxed);
107+
}
108+
}
109+
110+
// max_backup_ratio >= 1.0 means no limit (ratio cannot exceed 1.0).
111+
bool allow = _max_backup_ratio >= 1.0 || ratio < _max_backup_ratio;
112+
if (allow) {
113+
// Count backup decisions immediately for faster feedback
114+
// during latency spikes (before RPCs complete).
115+
_backup_count << 1;
116+
}
117+
return allow;
118+
}
119+
120+
void OnRPCEnd(const Controller* /*controller*/) {
121+
// Count all completed RPCs. Backup decisions are counted
122+
// in ShouldAllow() at decision time for faster feedback.
123+
_total_count << 1;
124+
}
125+
126+
private:
127+
double _max_backup_ratio;
128+
int64_t _update_interval_us;
129+
130+
bvar::Adder<int64_t> _total_count;
131+
mutable bvar::Adder<int64_t> _backup_count;
132+
bvar::Window<bvar::Adder<int64_t>> _total_window;
133+
bvar::Window<bvar::Adder<int64_t>> _backup_window;
134+
135+
mutable butil::atomic<double> _cached_ratio;
136+
mutable butil::atomic<int64_t> _last_update_us;
137+
};
138+
139+
// Internal BackupRequestPolicy that composes a BackupRateLimiter
140+
// for ratio-based suppression.
141+
class RateLimitedBackupPolicy : public BackupRequestPolicy {
142+
public:
143+
RateLimitedBackupPolicy(int32_t backup_request_ms,
144+
double max_backup_ratio,
145+
int window_size_seconds,
146+
int update_interval_seconds)
147+
: _backup_request_ms(backup_request_ms)
148+
, _rate_limiter(max_backup_ratio, window_size_seconds,
149+
update_interval_seconds) {
150+
}
151+
152+
int32_t GetBackupRequestMs(const Controller* /*controller*/) const override {
153+
return _backup_request_ms;
154+
}
155+
156+
bool DoBackup(const Controller* /*controller*/) const override {
157+
return _rate_limiter.ShouldAllow();
158+
}
159+
160+
void OnRPCEnd(const Controller* controller) override {
161+
_rate_limiter.OnRPCEnd(controller);
162+
}
163+
164+
private:
165+
int32_t _backup_request_ms;
166+
BackupRateLimiter _rate_limiter;
167+
};
168+
169+
BackupRequestPolicy* CreateRateLimitedBackupPolicy(
170+
int32_t backup_request_ms,
171+
double max_backup_ratio,
172+
int window_size_seconds,
173+
int update_interval_seconds) {
174+
if (backup_request_ms < 0) {
175+
LOG(ERROR) << "Invalid backup_request_ms=" << backup_request_ms
176+
<< ", must be >= 0";
177+
return NULL;
178+
}
179+
if (max_backup_ratio <= 0 || max_backup_ratio > 1.0) {
180+
LOG(ERROR) << "Invalid max_backup_ratio=" << max_backup_ratio
181+
<< ", must be in (0, 1]";
182+
return NULL;
183+
}
184+
if (window_size_seconds < 1 || window_size_seconds > 3600) {
185+
LOG(ERROR) << "Invalid window_size_seconds=" << window_size_seconds
186+
<< ", must be in [1, 3600]";
187+
return NULL;
188+
}
189+
if (update_interval_seconds < 1) {
190+
LOG(ERROR) << "Invalid update_interval_seconds="
191+
<< update_interval_seconds << ", must be >= 1";
192+
return NULL;
193+
}
194+
RateLimitedBackupPolicy* policy = new (std::nothrow) RateLimitedBackupPolicy(
195+
backup_request_ms, max_backup_ratio,
196+
window_size_seconds, update_interval_seconds);
197+
if (!policy) {
198+
LOG(ERROR) << "Fail to allocate RateLimitedBackupPolicy";
199+
}
200+
return policy;
201+
}
202+
203+
} // namespace brpc

src/brpc/backup_request_policy.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@ class BackupRequestPolicy {
3838
virtual void OnRPCEnd(const Controller* controller) = 0;
3939
};
4040

41+
// Create a BackupRequestPolicy that limits the ratio of backup requests
42+
// to total requests within a sliding time window. When the ratio reaches
43+
// or exceeds max_backup_ratio, DoBackup() returns false.
44+
// NOTE: Backup decisions are counted immediately at DoBackup() time for
45+
// fast feedback. Total RPCs are counted on completion (OnRPCEnd). During
46+
// latency spikes the ratio may temporarily lag until RPCs complete.
47+
// Returns NULL on invalid parameters or allocation failure.
48+
// backup_request_ms: >= 0
49+
// max_backup_ratio: (0, 1]
50+
// window_size_seconds: [1, 3600]
51+
// update_interval_seconds: >= 1
52+
// The caller owns the returned pointer.
53+
BackupRequestPolicy* CreateRateLimitedBackupPolicy(
54+
int32_t backup_request_ms,
55+
double max_backup_ratio,
56+
int window_size_seconds,
57+
int update_interval_seconds);
58+
4159
}
4260

4361
#endif // BRPC_BACKUP_REQUEST_POLICY_H

src/brpc/channel.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <google/protobuf/descriptor.h>
2121
#include <gflags/gflags.h>
2222
#include <memory>
23+
#include "butil/memory/scope_guard.h"
2324
#include "butil/time.h" // milliseconds_from_now
2425
#include "butil/logging.h"
2526
#include "butil/third_party/murmurhash3/murmurhash3.h"
@@ -43,6 +44,9 @@ namespace brpc {
4344

4445
DECLARE_bool(enable_rpcz);
4546
DECLARE_bool(usercode_in_pthread);
47+
DECLARE_double(backup_request_max_ratio);
48+
DECLARE_int32(backup_request_ratio_window_size_s);
49+
DECLARE_int32(backup_request_ratio_update_interval_s);
4650
DEFINE_string(health_check_path, "", "Http path of health check call."
4751
"By default health check succeeds if the server is connectable."
4852
"If this flag is set, health check is not completed until a http "
@@ -63,6 +67,7 @@ ChannelOptions::ChannelOptions()
6367
, log_succeed_without_server(true)
6468
, socket_mode(SOCKET_MODE_TCP)
6569
, auth(NULL)
70+
, backup_request_max_ratio(-1)
6671
, backup_request_policy(NULL)
6772
, retry_policy(NULL)
6873
, ns_filter(NULL)
@@ -164,6 +169,7 @@ Channel::Channel(ProfilerLinker)
164169
, _serialize_request(NULL)
165170
, _pack_request(NULL)
166171
, _get_method_name(NULL)
172+
, _owns_backup_policy(false)
167173
, _preferred_index(-1) {
168174
}
169175

@@ -172,12 +178,45 @@ Channel::~Channel() {
172178
const ChannelSignature sig = ComputeChannelSignature(_options);
173179
SocketMapRemove(SocketMapKey(_server_address, sig));
174180
}
181+
// Delete internally-created backup policy. Like user-provided
182+
// backup_request_policy, the caller must ensure no async RPCs are
183+
// in-flight when the Channel is destroyed.
184+
if (_owns_backup_policy) {
185+
delete _options.backup_request_policy;
186+
}
175187
}
176188

177189

178190
int Channel::InitChannelOptions(const ChannelOptions* options) {
191+
// Save any previously created internal backup policy (re-Init case).
192+
// Deletion is deferred to the end so failed re-Init can rollback.
193+
BackupRequestPolicy* old_backup_policy = NULL;
194+
bool had_old_policy = _owns_backup_policy;
195+
if (_owns_backup_policy) {
196+
old_backup_policy = _options.backup_request_policy;
197+
_options.backup_request_policy = NULL;
198+
_owns_backup_policy = false;
199+
}
200+
201+
// On failure, rollback old policy. On success, delete it.
202+
bool init_success = false;
203+
BRPC_SCOPE_EXIT {
204+
if (!init_success && had_old_policy) {
205+
_options.backup_request_policy = old_backup_policy;
206+
_owns_backup_policy = true;
207+
} else {
208+
delete old_backup_policy;
209+
}
210+
};
211+
179212
if (options) { // Override default options if user provided one.
180213
_options = *options;
214+
// If the incoming options reused the old internal policy pointer,
215+
// treat it as NULL since it will be deleted on success.
216+
if (_options.backup_request_policy == old_backup_policy &&
217+
old_backup_policy != NULL) {
218+
_options.backup_request_policy = NULL;
219+
}
181220
}
182221
const Protocol* protocol = FindProtocol(_options.protocol);
183222
if (NULL == protocol || !protocol->support_client()) {
@@ -242,6 +281,37 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
242281
if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) {
243282
butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg);
244283
}
284+
285+
// Create rate-limited backup policy if configured.
286+
// Per-channel option takes precedence over the global gflag.
287+
double max_ratio = _options.backup_request_max_ratio;
288+
if (max_ratio < 0) {
289+
max_ratio = FLAGS_backup_request_max_ratio;
290+
}
291+
if (max_ratio > 1.0) {
292+
LOG(WARNING) << "backup_request_max_ratio=" << max_ratio
293+
<< " is out of range (0, 1], clamped to 1.0";
294+
max_ratio = 1.0;
295+
}
296+
// User-provided backup_request_policy takes precedence.
297+
if (_options.backup_request_policy != NULL && max_ratio > 0) {
298+
LOG(WARNING) << "backup_request_max_ratio=" << max_ratio
299+
<< " is ignored because backup_request_policy is already set";
300+
}
301+
if (max_ratio > 0 && _options.backup_request_policy == NULL &&
302+
_options.backup_request_ms >= 0) {
303+
BackupRequestPolicy* policy = CreateRateLimitedBackupPolicy(
304+
_options.backup_request_ms, max_ratio,
305+
FLAGS_backup_request_ratio_window_size_s,
306+
FLAGS_backup_request_ratio_update_interval_s);
307+
if (policy) {
308+
_options.backup_request_policy = policy;
309+
_owns_backup_policy = true;
310+
} else {
311+
LOG(ERROR) << "Fail to create rate-limited backup policy";
312+
}
313+
}
314+
init_success = true;
245315
return 0;
246316
}
247317

src/brpc/channel.h

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,26 @@ struct ChannelOptions {
116116
// Default: NULL
117117
const Authenticator* auth;
118118

119+
// Maximum ratio of backup requests to total requests within a sliding
120+
// time window. When the ratio reaches or exceeds this value, backup
121+
// requests are suppressed.
122+
// Value in (0, 1] enables rate limiting. -1 (default) uses global gflag
123+
// FLAGS_backup_request_max_ratio. 0 explicitly disables rate limiting.
124+
// Only effective when backup_request_ms >= 0 and backup_request_policy
125+
// is NULL (i.e. no custom policy). When effective, an internal
126+
// rate-limited BackupRequestPolicy is created and used automatically.
127+
// Default: -1 (use FLAGS_backup_request_max_ratio)
128+
double backup_request_max_ratio;
129+
119130
// Customize the backup request time and whether to send backup request.
120-
// Priority: `backup_request_policy' > `backup_request_ms'.
121-
// Overridable by Controller.set_backup_request_ms() or
122-
// Controller.set_backup_request_policy().
123-
// This object is NOT owned by channel and should remain valid when channel is used.
131+
// Priority: `backup_request_policy' > `backup_request_max_ratio' > `backup_request_ms'.
132+
// Overridable per-RPC by Controller.set_backup_request_ms() or
133+
// Controller.set_backup_request_policy(). Note: per-RPC override
134+
// replaces the entire channel-level backup config including any
135+
// internal rate-limited policy created by backup_request_max_ratio.
136+
// When user-supplied, this object is NOT owned by channel and should
137+
// remain valid during channel's lifetime. When backup_request_max_ratio
138+
// creates an internal policy, that policy IS owned by channel.
124139
// Default: NULL
125140
BackupRequestPolicy* backup_request_policy;
126141

@@ -263,6 +278,7 @@ friend class SelectiveChannel;
263278
// the RPC above has finished
264279
butil::intrusive_ptr<SharedLoadBalancer> _lb;
265280
ChannelOptions _options;
281+
bool _owns_backup_policy;
266282
int _preferred_index;
267283
};
268284

0 commit comments

Comments
 (0)