Skip to content

Commit eac5a84

Browse files
committed
Adding alertmanager feature flag to limit blast radius of config issue and api improvements
Signed-off-by: Brenden Wu <[email protected]>
1 parent 194fbcb commit eac5a84

File tree

7 files changed

+311
-27
lines changed

7 files changed

+311
-27
lines changed

pkg/alertmanager/alertmanager_ring.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,56 @@ const (
2727
RingNumTokens = 128
2828
)
2929

30-
// RingOp is the operation used for reading/writing to the alertmanagers.
30+
// Original ring operations (with extension enabled for backward compatibility)
3131
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
32-
// Only ACTIVE Alertmanager get requests. If instance is not ACTIVE, we need to find another Alertmanager.
32+
// Original behavior: extend replica set if instance is not ACTIVE
3333
return s != ring.ACTIVE
3434
})
3535

36-
// SyncRingOp is the operation used for checking if a user is owned by an alertmanager.
3736
var SyncRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool {
37+
// Original behavior: extend replica set if instance is not ACTIVE
3838
return s != ring.ACTIVE
3939
})
4040

41+
// Blast radius limited ring operations (with extension disabled)
42+
var RingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
43+
// Never extend replica set to limit blast radius during config corruption incidents
44+
return false
45+
})
46+
47+
var SyncRingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool {
48+
// Never extend replica set during sync to limit blast radius during config corruption incidents
49+
return false
50+
})
51+
52+
// Helper functions to select the appropriate ring operation based on config
53+
func GetRingOp(disableExtension bool) ring.Operation {
54+
if disableExtension {
55+
return RingOpNoExtension
56+
}
57+
return RingOp
58+
}
59+
60+
func GetSyncRingOp(disableExtension bool) ring.Operation {
61+
if disableExtension {
62+
return SyncRingOpNoExtension
63+
}
64+
return SyncRingOp
65+
}
66+
4167
// RingConfig masks the ring lifecycler config which contains
4268
// many options not really required by the alertmanager ring. This config
4369
// is used to strip down the config to the minimum, and avoid confusion
4470
// to the user.
4571
type RingConfig struct {
46-
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
47-
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
48-
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
49-
ReplicationFactor int `yaml:"replication_factor"`
50-
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
51-
TokensFilePath string `yaml:"tokens_file_path"`
52-
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
72+
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
73+
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
74+
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
75+
ReplicationFactor int `yaml:"replication_factor"`
76+
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
77+
TokensFilePath string `yaml:"tokens_file_path"`
78+
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
79+
DisableReplicaSetExtension bool `yaml:"disable_replica_set_extension"`
5380

5481
FinalSleep time.Duration `yaml:"final_sleep"`
5582
WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"`
@@ -90,6 +117,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
90117
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")
91118
f.StringVar(&cfg.TokensFilePath, rfprefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
92119
f.BoolVar(&cfg.DetailedMetricsEnabled, rfprefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.")
120+
f.BoolVar(&cfg.DisableReplicaSetExtension, rfprefix+"disable-replica-set-extension", false, "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.")
93121

94122
// Instance flags
95123
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}

pkg/alertmanager/alertmanager_ring_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,52 @@ func TestIsHealthyForAlertmanagerOperations(t *testing.T) {
5252
})
5353
}
5454
}
55+
56+
func TestBlastRadiusProtection(t *testing.T) {
57+
t.Parallel()
58+
59+
tests := map[string]struct {
60+
operation ring.Operation
61+
instance *ring.InstanceDesc
62+
timeout time.Duration
63+
expected bool
64+
}{
65+
"RingOp extends to unhealthy ACTIVE instance": {
66+
operation: RingOp,
67+
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
68+
timeout: time.Minute,
69+
expected: false,
70+
},
71+
"RingOpNoExtension excludes unhealthy ACTIVE instance": {
72+
operation: RingOpNoExtension,
73+
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
74+
timeout: time.Minute,
75+
expected: false,
76+
},
77+
"RingOp extends to LEAVING instance": {
78+
operation: RingOp,
79+
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
80+
timeout: time.Minute,
81+
expected: false,
82+
},
83+
"RingOpNoExtension excludes LEAVING instance": {
84+
operation: RingOpNoExtension,
85+
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
86+
timeout: time.Minute,
87+
expected: false,
88+
},
89+
"Both operations include healthy ACTIVE instance": {
90+
operation: RingOp,
91+
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
92+
timeout: time.Minute,
93+
expected: true,
94+
},
95+
}
96+
97+
for testName, testData := range tests {
98+
t.Run(testName, func(t *testing.T) {
99+
actual := testData.instance.IsHealthy(testData.operation, testData.timeout, time.Now())
100+
assert.Equal(t, testData.expected, actual)
101+
})
102+
}
103+
}

pkg/alertmanager/distributor.go

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ type Distributor struct {
3636

3737
alertmanagerRing ring.ReadRing
3838
alertmanagerClientsPool ClientsPool
39-
40-
logger log.Logger
39+
ringConfig RingConfig
40+
logger log.Logger
4141
}
4242

4343
// NewDistributor constructs a new Distributor
44-
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
44+
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, ringConfig RingConfig, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
4545
if alertmanagerClientsPool == nil {
4646
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg)
4747
}
@@ -52,6 +52,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r
5252
maxRecvMsgSize: maxRecvMsgSize,
5353
alertmanagerRing: alertmanagersRing,
5454
alertmanagerClientsPool: alertmanagerClientsPool,
55+
ringConfig: ringConfig,
5556
}
5657

5758
d.Service = services.NewBasicService(nil, d.running, nil)
@@ -89,6 +90,9 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
8990
if strings.HasSuffix(path.Dir(p), "/v2/silence") {
9091
return true, merger.V2SilenceID{}
9192
}
93+
if strings.HasSuffix(p, "/v2/receivers") {
94+
return true, merger.V2Receivers{}
95+
}
9296
return false, nil
9397
}
9498

@@ -160,7 +164,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
160164
var responses []*httpgrpc.HTTPResponse
161165
var responsesMtx sync.Mutex
162166
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
163-
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
167+
err = ring.DoBatch(r.Context(), GetRingOp(d.ringConfig.DisableReplicaSetExtension), d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
164168
// Use a background context to make sure all alertmanagers get the request even if we return early.
165169
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))
166170
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
@@ -207,7 +211,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
207211

208212
func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
209213
key := users.ShardByUser(userID)
210-
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
214+
replicationSet, err := d.alertmanagerRing.Get(key, GetRingOp(d.ringConfig.DisableReplicaSetExtension), nil, nil, nil)
211215
if err != nil {
212216
level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err)
213217
w.WriteHeader(http.StatusInternalServerError)
@@ -244,16 +248,30 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
244248
instances[i], instances[j] = instances[j], instances[i]
245249
})
246250
} else {
247-
//Picking 1 instance at Random for Non-Get and Non-Delete Unary Read requests, as shuffling through large number of instances might increase complexity
248-
randN := rand.Intn(len(replicationSet.Instances))
249-
instances = replicationSet.Instances[randN : randN+1]
251+
// For POST requests, add retry logic to PutSilence
252+
if d.isUnaryWritePath(r.URL.Path) {
253+
instances = replicationSet.Instances
254+
rand.Shuffle(len(instances), func(i, j int) {
255+
instances[i], instances[j] = instances[j], instances[i]
256+
})
257+
} else {
258+
// Other POST requests pick 1 instance at Random for Non-Get and Non-Delete Unary Read requests, as shuffling through large number of instances might increase complexity
259+
randN := rand.Intn(len(replicationSet.Instances))
260+
instances = replicationSet.Instances[randN : randN+1]
261+
}
250262
}
251263

252264
var lastErr error
253265
for _, instance := range instances {
254266
resp, err := d.doRequest(ctx, instance, req)
255-
// storing the last error message
256267
if err != nil {
268+
// For PutSilence with non-retryable errors, fail immediately
269+
if d.isUnaryWritePath(r.URL.Path) && !d.isRetryableError(err) {
270+
level.Error(logger).Log("msg", "non-retryable error from alertmanager", "instance", instance.Addr, "err", err)
271+
respondFromError(err, w, logger)
272+
return
273+
}
274+
// storing the last error message
257275
lastErr = err
258276
continue
259277
}
@@ -267,6 +285,49 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
267285
}
268286
}
269287

288+
// isRetryableError determines if an error is retryable (network/availability issues)
289+
// vs non-retryable (bad request, validation errors)
290+
func (d *Distributor) isRetryableError(err error) bool {
291+
if err == nil {
292+
return false
293+
}
294+
295+
// Check if it's an HTTP error with a status code
296+
httpResp, ok := httpgrpc.HTTPResponseFromError(errors.Cause(err))
297+
if ok {
298+
statusCode := int(httpResp.Code)
299+
300+
if statusCode == http.StatusRequestTimeout || statusCode == http.StatusTooManyRequests || statusCode >= 500 {
301+
return true
302+
}
303+
304+
if statusCode >= 400 && statusCode < 500 {
305+
return false
306+
}
307+
}
308+
309+
// Network errors, context errors, etc. are retryable
310+
errorStr := err.Error()
311+
retryablePatterns := []string{
312+
"connection refused",
313+
"connection reset",
314+
"timeout",
315+
"context deadline exceeded",
316+
"no such host",
317+
"network is unreachable",
318+
"broken pipe",
319+
}
320+
321+
for _, pattern := range retryablePatterns {
322+
if strings.Contains(strings.ToLower(errorStr), pattern) {
323+
return true
324+
}
325+
}
326+
327+
// Default to retryable for unknown errors to maximize availability
328+
return true
329+
}
330+
270331
func respondFromError(err error, w http.ResponseWriter, logger log.Logger) {
271332
httpResp, ok := httpgrpc.HTTPResponseFromError(errors.Cause(err))
272333
if !ok {

pkg/alertmanager/distributor_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ func TestDistributor_DistributeRequest(t *testing.T) {
233233
expStatusCode: http.StatusOK,
234234
expectedTotalCalls: 1,
235235
route: "/receivers",
236+
}, {
237+
name: "Read /v2/receivers is sent to 3 AMs",
238+
numAM: 5,
239+
numHappyAM: 5,
240+
replicationFactor: 3,
241+
isRead: true,
242+
expStatusCode: http.StatusOK,
243+
expectedTotalCalls: 3,
244+
route: "/v2/receivers",
245+
responseBody: []byte(`[{"name":"default"},{"name":"slack"}]`),
236246
}, {
237247
name: "Write /receivers not supported",
238248
numAM: 5,
@@ -352,8 +362,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod
352362

353363
cfg := &MultitenantAlertmanagerConfig{}
354364
flagext.DefaultValues(cfg)
365+
cfg.ShardingRing.DisableReplicaSetExtension = false
355366

356-
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
367+
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), cfg.ShardingRing, util_log.Logger, prometheus.NewRegistry())
357368
require.NoError(t, err)
358369
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
359370

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package merger
2+
3+
import (
4+
"github.com/go-openapi/swag/jsonutils"
5+
v2_models "github.com/prometheus/alertmanager/api/v2/models"
6+
)
7+
8+
// V2Receivers implements the Merger interface for GET /v2/receivers. It returns the union of receivers
9+
// over all the responses. When a receiver with the same name exists in multiple responses, any one
10+
// of them is returned (they should be identical across replicas).
11+
type V2Receivers struct{}
12+
13+
func (V2Receivers) MergeResponses(in [][]byte) ([]byte, error) {
14+
receivers := make([]*v2_models.Receiver, 0)
15+
for _, body := range in {
16+
parsed := make([]*v2_models.Receiver, 0)
17+
if err := jsonutils.ReadJSON(body, &parsed); err != nil {
18+
return nil, err
19+
}
20+
receivers = append(receivers, parsed...)
21+
}
22+
23+
merged := mergeV2Receivers(receivers)
24+
return jsonutils.WriteJSON(merged)
25+
}
26+
27+
func mergeV2Receivers(in []*v2_models.Receiver) []*v2_models.Receiver {
28+
// Deduplicate receivers by name. Since receivers should be identical across replicas
29+
// (they come from the same config), we just keep the first occurrence of each name.
30+
seen := make(map[string]bool)
31+
result := make([]*v2_models.Receiver, 0)
32+
33+
for _, receiver := range in {
34+
if receiver.Name == nil {
35+
continue
36+
}
37+
38+
name := *receiver.Name
39+
if !seen[name] {
40+
seen[name] = true
41+
result = append(result, receiver)
42+
}
43+
}
44+
45+
return result
46+
}

0 commit comments

Comments
 (0)