-
Notifications
You must be signed in to change notification settings - Fork 774
pkg/unsaferecovery: add unsafe recovery abort #10641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
740e85c
30e6fcd
cfef6f4
d2a8c21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import ( | |
| "sort" | ||
| "strconv" | ||
| "strings" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "go.uber.org/zap" | ||
|
|
@@ -47,6 +48,12 @@ const ( | |
| defaultPlanExecutionTimeout = time.Second * 60 | ||
| ) | ||
|
|
||
| var globalRecoveryStep = uint64(time.Now().UnixNano()) | ||
|
|
||
| func nextRecoveryStep() uint64 { | ||
| return atomic.AddUint64(&globalRecoveryStep, 1) | ||
| } | ||
|
|
||
| // Stage transition graph: for more details, please check `Controller.HandleStoreHeartbeat()` | ||
| // | ||
| // +-----------+ +-----------+ | ||
|
|
@@ -122,10 +129,11 @@ type Controller struct { | |
| cluster cluster | ||
| stage stage | ||
| // the round of recovery, which is an increasing number to identify the reports of each round | ||
| step uint64 | ||
| failedStores map[uint64]struct{} | ||
| timeout time.Time | ||
| autoDetect bool | ||
| step uint64 | ||
| recoveryStartStep uint64 | ||
| failedStores map[uint64]struct{} | ||
| timeout time.Time | ||
| autoDetect bool | ||
| // planExecutionTimeout is the duration PD waits for a store to execute the recovery plan | ||
| // before dispatching the same plan again. | ||
| planExecutionTimeout time.Duration | ||
|
|
@@ -175,6 +183,7 @@ func NewController(cluster cluster) *Controller { | |
| func (u *Controller) reset() { | ||
| u.stage = Idle | ||
| u.step = 0 | ||
| u.recoveryStartStep = 0 | ||
| u.failedStores = make(map[uint64]struct{}) | ||
| u.storeReports = make(map[uint64]*pdpb.StoreReport) | ||
| u.numStoresReported = 0 | ||
|
|
@@ -261,6 +270,8 @@ func (u *Controller) RemoveFailedStoresWithOptions( | |
| } | ||
|
|
||
| u.timeout = time.Now().Add(time.Duration(timeout) * time.Second) | ||
| u.recoveryStartStep = nextRecoveryStep() | ||
| u.step = u.recoveryStartStep | ||
| u.failedStores = failedStores | ||
| u.autoDetect = autoDetect | ||
| if options.PlanExecutionTimeout > 0 { | ||
|
|
@@ -271,6 +282,24 @@ func (u *Controller) RemoveFailedStoresWithOptions( | |
| return nil | ||
| } | ||
|
|
||
| // AbortFailedStoresRemoval aborts the current unsafe recovery process in a best-effort way. | ||
| // It asks TiKV to exit force leader by dispatching empty recovery plans, but any plan that | ||
| // has already been delivered to TiKV may keep running until TiKV finishes or times it out. | ||
| func (u *Controller) AbortFailedStoresRemoval() error { | ||
| u.Lock() | ||
| defer u.Unlock() | ||
|
|
||
| if !isRunning(u.stage) { | ||
| return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no ongoing unsafe recovery") | ||
| } | ||
| if u.stage == ExitForceLeader { | ||
| return nil | ||
| } | ||
|
|
||
| u.handleErr(errors.New("aborted by operator")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we make abort a no-op when stage == ExitForceLeader, or keep the controller in ExitForceLeader and add a repeated-abort test?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adjusted in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rechecked on current head
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rechecked on the current head
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rechecked on the current head
|
||
| return nil | ||
| } | ||
|
|
||
| // Show returns the current status of ongoing unsafe recover operation. | ||
| func (u *Controller) Show() []StageOutput { | ||
| u.Lock() | ||
|
|
@@ -584,8 +613,9 @@ func (u *Controller) changeStage(stage stage) { | |
| output.Details = append(output.Details, fmt.Sprintf("triggered by error: %v", u.err.Error())) | ||
| } | ||
| case Finished: | ||
| if u.step > 1 { | ||
| // == 1 means no operation has done, no need to invalid cache | ||
| if u.step > u.recoveryStartStep+1 { | ||
| // Only CollectReport has finished when step == recoveryStartStep+1, | ||
| // which means no operation has done and no cache invalidation is needed. | ||
| u.cluster.ResetRegionCache() | ||
| } | ||
| output.Info = "Unsafe recovery Finished" | ||
|
|
@@ -618,7 +648,7 @@ func (u *Controller) changeStage(stage stage) { | |
| } | ||
| u.orphanedPeers = map[uint64][]*metapb.Peer{} | ||
| u.numStoresReported = 0 | ||
| u.step += 1 | ||
| u.step = nextRecoveryStep() | ||
| } | ||
|
|
||
| func (u *Controller) getForceLeaderPlanDigest() map[string][]string { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.