diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6a4943c665..2995b00baf 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -544,6 +544,12 @@ sharding_ring: # CLI flag: -alertmanager.sharding-ring.detailed-metrics-enabled [detailed_metrics_enabled: | default = true] + # Disable extending the replica set when instances are unhealthy. This limits + # blast radius during config corruption incidents but reduces availability + # during normal failures. + # CLI flag: -alertmanager.sharding-ring.disable-replica-set-extension + [disable_replica_set_extension: | default = false] + # The sleep seconds when alertmanager is shutting down. Need to be close to or # larger than KV Store information propagation delay # CLI flag: -alertmanager.sharding-ring.final-sleep diff --git a/pkg/alertmanager/alertmanager_ring.go b/pkg/alertmanager/alertmanager_ring.go index 33d72daeeb..5c0280a68f 100644 --- a/pkg/alertmanager/alertmanager_ring.go +++ b/pkg/alertmanager/alertmanager_ring.go @@ -38,18 +38,45 @@ var SyncRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, fun return s != ring.ACTIVE }) +// Blast radius limited ring operations (with extension disabled) +var RingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { + // Never extend replica set to limit blast radius during config corruption incidents + return false +}) + +var SyncRingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool { + // Never extend replica set during sync to limit blast radius during config corruption incidents + return false +}) + +// Helper functions to select the appropriate ring operation based on config +func getRingOp(disableExtension bool) ring.Operation { + if disableExtension { + return RingOpNoExtension + } + return RingOp +} + +func getSyncRingOp(disableExtension bool) ring.Operation { + if disableExtension { + return SyncRingOpNoExtension + } + return SyncRingOp +} + // RingConfig masks the ring lifecycler config which contains // many options not really required by the alertmanager ring. This config // is used to strip down the config to the minimum, and avoid confusion // to the user. type RingConfig struct { - KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - ReplicationFactor int `yaml:"replication_factor"` - ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` - TokensFilePath string `yaml:"tokens_file_path"` - DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"` + KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + TokensFilePath string `yaml:"tokens_file_path"` + DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"` + DisableReplicaSetExtension bool `yaml:"disable_replica_set_extension"` FinalSleep time.Duration `yaml:"final_sleep"` WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"` @@ -90,6 +117,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.") f.StringVar(&cfg.TokensFilePath, rfprefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") f.BoolVar(&cfg.DetailedMetricsEnabled, rfprefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.") + f.BoolVar(&cfg.DisableReplicaSetExtension, rfprefix+"disable-replica-set-extension", false, "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.") // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} diff --git a/pkg/alertmanager/alertmanager_ring_test.go b/pkg/alertmanager/alertmanager_ring_test.go index ec1f3008fa..130bc87a75 100644 --- a/pkg/alertmanager/alertmanager_ring_test.go +++ b/pkg/alertmanager/alertmanager_ring_test.go @@ -52,3 +52,52 @@ func TestIsHealthyForAlertmanagerOperations(t *testing.T) { }) } } + +func TestBlastRadiusProtection(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + operation ring.Operation + instance *ring.InstanceDesc + timeout time.Duration + expected bool + }{ + "RingOp extends to unhealthy ACTIVE instance": { + operation: RingOp, + instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "RingOpNoExtension excludes unhealthy ACTIVE instance": { + operation: RingOpNoExtension, + instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "RingOp extends to LEAVING instance": { + operation: RingOp, + instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "RingOpNoExtension excludes LEAVING instance": { + operation: RingOpNoExtension, + instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "Both operations include healthy ACTIVE instance": { + operation: RingOp, + instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actual := testData.instance.IsHealthy(testData.operation, testData.timeout, time.Now()) + assert.Equal(t, testData.expected, actual) + }) + } +} diff --git a/pkg/alertmanager/distributor.go b/pkg/alertmanager/distributor.go index e7063d62e4..4a413c1fb4 100644 --- a/pkg/alertmanager/distributor.go +++ b/pkg/alertmanager/distributor.go @@ -36,12 +36,12 @@ type Distributor struct { alertmanagerRing ring.ReadRing alertmanagerClientsPool ClientsPool - - logger log.Logger + ringConfig RingConfig + logger log.Logger } // NewDistributor constructs a new Distributor -func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) { +func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, ringConfig RingConfig, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) { if alertmanagerClientsPool == nil { alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg) } @@ -52,6 +52,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r maxRecvMsgSize: maxRecvMsgSize, alertmanagerRing: alertmanagersRing, alertmanagerClientsPool: alertmanagerClientsPool, + ringConfig: ringConfig, } d.Service = services.NewBasicService(nil, d.running, nil) @@ -160,7 +161,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req var responses []*httpgrpc.HTTPResponse var responsesMtx sync.Mutex grpcHeaders := httpToHttpgrpcHeaders(r.Header) - err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error { + err = ring.DoBatch(r.Context(), getRingOp(d.ringConfig.DisableReplicaSetExtension), d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error { // Use a background context to make sure all alertmanagers get the request even if we return early. localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context())) sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum") @@ -207,7 +208,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) { key := users.ShardByUser(userID) - replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil) + replicationSet, err := d.alertmanagerRing.Get(key, getRingOp(d.ringConfig.DisableReplicaSetExtension), nil, nil, nil) if err != nil { level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err) w.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/alertmanager/distributor_test.go b/pkg/alertmanager/distributor_test.go index fb0dd42ace..bb0c1253d3 100644 --- a/pkg/alertmanager/distributor_test.go +++ b/pkg/alertmanager/distributor_test.go @@ -352,8 +352,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod cfg := &MultitenantAlertmanagerConfig{} flagext.DefaultValues(cfg) + cfg.ShardingRing.DisableReplicaSetExtension = false - d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry()) + d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), cfg.ShardingRing, util_log.Logger, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 705577209e..7373f45025 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -435,7 +435,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC am.grpcServer = server.NewServer(&handlerForGRPCServer{am: am}) am.alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(am.ring), cfg.AlertmanagerClient, logger, am.registry) - am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, log.With(logger, "component", "AlertmanagerDistributor"), am.registry) + am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, cfg.ShardingRing, log.With(logger, "component", "AlertmanagerDistributor"), am.registry) if err != nil { return nil, errors.Wrap(err, "create distributor") } @@ -515,7 +515,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { if am.cfg.ShardingEnabled { // Store the ring state after the initial Alertmanager configs sync has been done and before we do change // our state in the ring. - am.ringLastState, _ = am.ring.GetAllHealthy(RingOp) + am.ringLastState, _ = am.ring.GetAllHealthy(getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension)) // Make sure that all the alertmanagers we were initially configured with have // fetched state from the replicas, before advertising as ACTIVE. This will @@ -688,7 +688,7 @@ func (am *MultitenantAlertmanager) run(ctx context.Context) error { case <-ringTickerChan: // We ignore the error because in case of error it will return an empty // replication set which we use to compare with the previous state. - currRingState, _ := am.ring.GetAllHealthy(RingOp) + currRingState, _ := am.ring.GetAllHealthy(getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension)) if ring.HasReplicationSetChanged(am.ringLastState, currRingState) { am.ringLastState = currRingState @@ -828,7 +828,7 @@ func (am *MultitenantAlertmanager) isUserOwned(userID string) bool { return true } - alertmanagers, err := am.ring.Get(users.ShardByUser(userID), SyncRingOp, nil, nil, nil) + alertmanagers, err := am.ring.Get(users.ShardByUser(userID), getSyncRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil) if err != nil { am.ringCheckErrors.Inc() level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err) @@ -1038,7 +1038,7 @@ func (am *MultitenantAlertmanager) GetPositionForUser(userID string) int { return 0 } - set, err := am.ring.Get(users.ShardByUser(userID), RingOp, nil, nil, nil) + set, err := am.ring.Get(users.ShardByUser(userID), getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil) if err != nil { level.Error(am.logger).Log("msg", "unable to read the ring while trying to determine the alertmanager position", "err", err) // If we're unable to determine the position, we don't want a tenant to miss out on the notification - instead, @@ -1139,7 +1139,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us level.Debug(am.logger).Log("msg", "message received for replication", "user", userID, "key", part.Key) selfAddress := am.ringLifecycler.GetInstanceAddr() - err := ring.DoBatch(ctx, RingOp, am.ring, nil, []uint32{users.ShardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error { + err := ring.DoBatch(ctx, getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), am.ring, nil, []uint32{users.ShardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error { if desc.GetAddr() == selfAddress { return nil } @@ -1171,7 +1171,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) { // Only get the set of replicas which contain the specified user. key := users.ShardByUser(userID) - replicationSet, err := am.ring.Get(key, RingOp, nil, nil, nil) + replicationSet, err := am.ring.Get(key, getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil) if err != nil { return nil, err } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 67cd48dc87..76eeb1a403 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -347,6 +347,12 @@ "type": "boolean", "x-cli-flag": "alertmanager.sharding-ring.detailed-metrics-enabled" }, + "disable_replica_set_extension": { + "default": false, + "description": "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.", + "type": "boolean", + "x-cli-flag": "alertmanager.sharding-ring.disable-replica-set-extension" + }, "final_sleep": { "default": "0s", "description": "The sleep seconds when alertmanager is shutting down. Need to be close to or larger than KV Store information propagation delay",