Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand All @@ -47,6 +48,12 @@ const (
defaultPlanExecutionTimeout = time.Second * 60
)

var globalRecoveryStep = uint64(time.Now().UnixNano())

func nextRecoveryStep() uint64 {
return atomic.AddUint64(&globalRecoveryStep, 1)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Stage transition graph: for more details, please check `Controller.HandleStoreHeartbeat()`
//
// +-----------+ +-----------+
Expand Down Expand Up @@ -122,10 +129,11 @@ type Controller struct {
cluster cluster
stage stage
// the round of recovery, which is an increasing number to identify the reports of each round
step uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool
step uint64
recoveryStartStep uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool
// planExecutionTimeout is the duration PD waits for a store to execute the recovery plan
// before dispatching the same plan again.
planExecutionTimeout time.Duration
Expand Down Expand Up @@ -175,6 +183,7 @@ func NewController(cluster cluster) *Controller {
func (u *Controller) reset() {
u.stage = Idle
u.step = 0
u.recoveryStartStep = 0
u.failedStores = make(map[uint64]struct{})
u.storeReports = make(map[uint64]*pdpb.StoreReport)
u.numStoresReported = 0
Expand Down Expand Up @@ -261,6 +270,8 @@ func (u *Controller) RemoveFailedStoresWithOptions(
}

u.timeout = time.Now().Add(time.Duration(timeout) * time.Second)
u.recoveryStartStep = nextRecoveryStep()
u.step = u.recoveryStartStep
u.failedStores = failedStores
u.autoDetect = autoDetect
if options.PlanExecutionTimeout > 0 {
Expand All @@ -271,6 +282,24 @@ func (u *Controller) RemoveFailedStoresWithOptions(
return nil
}

// AbortFailedStoresRemoval aborts the current unsafe recovery process in a best-effort way.
// It asks TiKV to exit force leader by dispatching empty recovery plans, but any plan that
// has already been delivered to TiKV may keep running until TiKV finishes or times it out.
func (u *Controller) AbortFailedStoresRemoval() error {
u.Lock()
defer u.Unlock()

if !isRunning(u.stage) {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no ongoing unsafe recovery")
}
if u.stage == ExitForceLeader {
return nil
}

u.handleErr(errors.New("aborted by operator"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make abort a no-op when stage == ExitForceLeader, or keep the controller in ExitForceLeader and add a repeated-abort test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted in 30e6fcd94: abort is now a no-op once the controller is already in ExitForceLeader, and TestAbortFailedStoresRemoval covers the repeated-abort path.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechecked on current head cfef6f4c: AbortFailedStoresRemoval() still returns nil once the controller is already in ExitForceLeader, and TestAbortFailedStoresRemoval exercises the repeated-abort path after entering that stage. I also reran CGO_ENABLED=0 go test ./pkg/unsaferecovery -run 'TestAbortFailedStoresRemoval|TestUnsafeRecoveryStepIsUniqueAcrossRuns' -count=1 locally after confirming the branch is already up to date with upstream/master.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechecked on the current head d2a8c217e: AbortFailedStoresRemoval() still returns nil once the controller is already in ExitForceLeader, keeps the stage there, and TestAbortFailedStoresRemoval still covers the repeated-abort path plus the empty recovery plan dispatch afterward. I also reran CGO_ENABLED=0 go test ./pkg/unsaferecovery -run 'TestAbortFailedStoresRemoval|TestUnsafeRecoveryStepIsUniqueAcrossRuns' -count=1 on this head.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechecked on the current head d2a8c217e: AbortFailedStoresRemoval() still returns nil once the controller is already in ExitForceLeader, so repeated aborts remain a no-op and keep the stage there.

TestAbortFailedStoresRemoval still covers that repeated-abort path, the empty recovery plan dispatched on the next heartbeat, and the matching report afterward. I also reran these checks on this head:

  • CGO_ENABLED=0 go test ./pkg/unsaferecovery -run 'TestAbortFailedStoresRemoval|TestUnsafeRecoveryStepIsUniqueAcrossRuns' -count=1\n- CGO_ENABLED=0 go test -tags without_dashboard ./tests/server/api -run TestUnsafeOperationTestSuite -count=1\n- CGO_ENABLED=0 go test -tags without_dashboard ./pd-ctl/tests/unsafe -run TestRemoveFailedStores -count=1 from tools/

return nil
}

// Show returns the current status of ongoing unsafe recover operation.
func (u *Controller) Show() []StageOutput {
u.Lock()
Expand Down Expand Up @@ -584,8 +613,9 @@ func (u *Controller) changeStage(stage stage) {
output.Details = append(output.Details, fmt.Sprintf("triggered by error: %v", u.err.Error()))
}
case Finished:
if u.step > 1 {
// == 1 means no operation has done, no need to invalid cache
if u.step > u.recoveryStartStep+1 {
// Only CollectReport has finished when step == recoveryStartStep+1,
// which means no operation has done and no cache invalidation is needed.
u.cluster.ResetRegionCache()
}
output.Info = "Unsafe recovery Finished"
Expand Down Expand Up @@ -618,7 +648,7 @@ func (u *Controller) changeStage(stage stage) {
}
u.orphanedPeers = map[uint64][]*metapb.Peer{}
u.numStoresReported = 0
u.step += 1
u.step = nextRecoveryStep()
}

func (u *Controller) getForceLeaderPlanDigest() map[string][]string {
Expand Down
91 changes: 80 additions & 11 deletions pkg/unsaferecovery/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestFinished(t *testing.T) {
re.Empty(resp.RecoveryPlan.Creates)
re.Empty(resp.RecoveryPlan.Demotes)
re.Nil(resp.RecoveryPlan.ForceLeader)
re.Equal(uint64(1), resp.RecoveryPlan.Step)
re.Equal(recoveryController.step, resp.RecoveryPlan.Step)
applyRecoveryPlan(re, storeID, reports, resp)
}

Expand Down Expand Up @@ -646,11 +646,11 @@ func TestForceLeaderFail(t *testing.T) {

req1 := newStoreHeartbeat(1, reports[1])
resp1 := &pdpb.StoreHeartbeatResponse{}
req1.StoreReport.Step = 1
req1.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req1, resp1)
req2 := newStoreHeartbeat(2, reports[2])
resp2 := &pdpb.StoreHeartbeatResponse{}
req2.StoreReport.Step = 1
req2.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req2, resp2)
re.Equal(ForceLeader, recoveryController.GetStage())
recoveryController.HandleStoreHeartbeat(req1, resp1)
Expand Down Expand Up @@ -765,7 +765,7 @@ func TestForceLeaderForCommitMerge(t *testing.T) {

req := newStoreHeartbeat(1, reports[1])
resp := &pdpb.StoreHeartbeatResponse{}
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ForceLeaderForCommitMerge, recoveryController.GetStage())

Expand Down Expand Up @@ -873,7 +873,7 @@ func TestAutoDetectWithOneLearner(t *testing.T) {
},
}
req := newStoreHeartbeat(1, &storeReport)
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
hasStore3AsFailedStore := false
Expand Down Expand Up @@ -1387,7 +1387,7 @@ func TestExecutionTimeout(t *testing.T) {
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ExitForceLeader, recoveryController.GetStage())
req.StoreReport = &pdpb.StoreReport{Step: 2}
req.StoreReport = &pdpb.StoreReport{Step: recoveryController.step}
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(Failed, recoveryController.GetStage())

Expand Down Expand Up @@ -1451,7 +1451,7 @@ func TestExitForceLeader(t *testing.T) {
IsForceLeader: true,
},
},
Step: 1,
Step: recoveryController.step,
},
}

Expand Down Expand Up @@ -1537,7 +1537,7 @@ func TestStep(t *testing.T) {
re.Equal(CollectReport, recoveryController.GetStage())

// valid store report
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ForceLeader, recoveryController.GetStage())

Expand Down Expand Up @@ -1767,7 +1767,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1}, {Id: 12, StoreId: 4}, {Id: 13, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
2: {PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
Expand All @@ -1779,7 +1779,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 5, Version: 8},
Peers: []*metapb.Peer{
{Id: 21, StoreId: 2}, {Id: 22, StoreId: 4}, {Id: 23, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
3: {PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
Expand All @@ -1791,7 +1791,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 4, Version: 6},
Peers: []*metapb.Peer{
{Id: 31, StoreId: 3}, {Id: 32, StoreId: 4}, {Id: 33, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
}

advanceUntilFinished(re, recoveryController, reports)
Expand Down Expand Up @@ -1967,6 +1967,75 @@ func TestRemoveFailedStores(t *testing.T) {
}, 60, false))
}

func TestAbortFailedStoresRemoval(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(ctx, opts)
coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true))
coordinator.Run()
for _, store := range newTestStores(2, "6.0.0") {
cluster.PutStore(store)
}
recoveryController := NewController(cluster)
re.Error(recoveryController.AbortFailedStoresRemoval())

re.NoError(recoveryController.RemoveFailedStores(map[uint64]struct{}{
1: {},
}, 60, false))
re.NoError(recoveryController.AbortFailedStoresRemoval())
re.Equal(ExitForceLeader, recoveryController.GetStage())
re.Contains(recoveryController.output[1].Details[0], "aborted by operator")
re.NoError(recoveryController.AbortFailedStoresRemoval())
re.Equal(ExitForceLeader, recoveryController.GetStage())

resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, nil), resp)
re.NotNil(resp.GetRecoveryPlan())
re.Empty(resp.GetRecoveryPlan().GetForceLeader().GetEnterForceLeaders())
re.Empty(resp.GetRecoveryPlan().GetCreates())
re.Empty(resp.GetRecoveryPlan().GetDemotes())
re.Empty(resp.GetRecoveryPlan().GetTombstones())

report := &pdpb.StoreReport{Step: resp.GetRecoveryPlan().GetStep()}
recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, report), &pdpb.StoreHeartbeatResponse{})
re.Equal(Failed, recoveryController.GetStage())
}

func TestUnsafeRecoveryStepIsUniqueAcrossRuns(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(ctx, opts)
coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true))
coordinator.Run()
for _, store := range newTestStores(1, "6.0.0") {
cluster.PutStore(store)
}
recoveryController := NewController(cluster)
re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))
oldStep := recoveryController.step
recoveryController.changeStage(ExitForceLeader)
exitForceLeaderStep := recoveryController.step
re.NotEqual(oldStep, exitForceLeaderStep)
recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(1, &pdpb.StoreReport{Step: oldStep}), &pdpb.StoreHeartbeatResponse{})
re.Equal(ExitForceLeader, recoveryController.GetStage())
re.Nil(recoveryController.storeReports[1])
recoveryController.changeStage(Failed)

re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))
newStep := recoveryController.step
re.NotEqual(oldStep, newStep)

recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(1, &pdpb.StoreReport{Step: oldStep}), &pdpb.StoreHeartbeatResponse{})
re.Equal(CollectReport, recoveryController.GetStage())
re.Nil(recoveryController.storeReports[1])
}

func TestRunning(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
unsafeOperationHandler := newUnsafeOperationHandler(svr, rd)
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores",
unsafeOperationHandler.RemoveFailedStores, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/abort",
unsafeOperationHandler.AbortFailedStoresRemoval, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/show",
unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus))

Expand Down
27 changes: 27 additions & 0 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"github.com/unrolled/render"

perrors "github.com/pingcap/errors"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -104,6 +107,30 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
h.rd.JSON(w, http.StatusOK, "Request has been accepted.")
}

// AbortFailedStoresRemoval aborts the current failed stores removal.
//
// @Tags unsafe
// @Summary Abort the current failed stores removal.
// @Produce json
//
// Success 200 {string} string "Request has been accepted."
// Failure 400 {string} string "There is no ongoing failed stores removal."
// Failure 500 {string} string "PD server failed to proceed the request."
//
// @Router /admin/unsafe/remove-failed-stores/abort [post]
func (h *unsafeOperationHandler) AbortFailedStoresRemoval(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
if err := rc.GetUnsafeRecoveryController().AbortFailedStoresRemoval(); err != nil {
if perrors.ErrorEqual(err, errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no ongoing unsafe recovery")) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, "Request has been accepted.")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func parseTimeout(input map[string]any) (uint64, error) {
raw, exists := input["timeout"]
if !exists {
Expand Down
7 changes: 7 additions & 0 deletions tests/server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,19 @@ func (suite *unsafeOperationTestSuite) checkRemoveFailedStores(cluster *tests.Te
testutil.StringContain(re, "disable-paranoid-check is specified multiple times"))
re.NoError(err)

err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores/abort", nil, testutil.Status(re, 400),
testutil.StringEqual(re, "\"[PD:unsaferecovery:ErrUnsafeRecoveryInvalidInput]invalid input no ongoing unsafe recovery\"\n"))
re.NoError(err)

input = map[string]any{"stores": []uint64{1}, "plan-execution-timeout": 300, "disable-paranoid-check": true}
data, err = json.Marshal(input)
re.NoError(err)
err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores", data, testutil.StatusOK(re))
re.NoError(err)

err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores/abort", nil, testutil.StatusOK(re))
re.NoError(err)

// Test show
var output []unsaferecovery.StageOutput
err = testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix+"/remove-failed-stores/show", &output)
Expand Down
15 changes: 15 additions & 0 deletions tools/pd-ctl/pdctl/command/unsafe_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewRemoveFailedStoresCommand() *cobra.Command {
Note: DO NOT RECOMMEND to use this flag for general use, it's used only for case that PD doesn't have the store information of failed stores after pd-recover;
Note: Do it with caution to make sure all live stores's heartbeats has been reported PD already, otherwise it may regarded some stores as failed mistakenly.`)
cmd.AddCommand(NewRemoveFailedStoresShowCommand())
cmd.AddCommand(NewRemoveFailedStoresAbortCommand())
return cmd
}

Expand All @@ -65,6 +66,15 @@ func NewRemoveFailedStoresShowCommand() *cobra.Command {
}
}

// NewRemoveFailedStoresAbortCommand returns the unsafe remove failed stores abort command.
func NewRemoveFailedStoresAbortCommand() *cobra.Command {
return &cobra.Command{
Use: "abort",
Short: "Abort the current failed stores removal",
Run: removeFailedStoresAbortCommandFunc,
}
}

func removeFailedStoresCommandFunc(cmd *cobra.Command, args []string) {
prefix := fmt.Sprintf("%s/remove-failed-stores", unsafePrefix)
postInput := make(map[string]any, 3)
Expand Down Expand Up @@ -150,3 +160,8 @@ func removeFailedStoresShowCommandFunc(cmd *cobra.Command, _ []string) {
}
cmd.Println(resp)
}

func removeFailedStoresAbortCommandFunc(cmd *cobra.Command, _ []string) {
prefix := fmt.Sprintf("%s/remove-failed-stores/abort", unsafePrefix)
postJSON(cmd, prefix, nil)
}
3 changes: 3 additions & 0 deletions tools/pd-ctl/tests/unsafe/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func TestRemoveFailedStores(t *testing.T) {
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "1,2,3", "--timeout", "abc"}
_, err = tests.ExecuteCommand(cmd, args...)
re.Error(err)
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "abort"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "show"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down
Loading