Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions pkg/storage/endpoint/region_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"strconv"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/keypath"
)

// RegionSyncerStorage defines the storage operations on the region syncer's
// cluster-level committed state.
type RegionSyncerStorage interface {
LoadRegionSyncerCommittedRegionCount() (uint64, error)
SaveRegionSyncerCommittedRegionCount(count uint64) error
}

var _ RegionSyncerStorage = (*StorageEndpoint)(nil)

// LoadRegionSyncerCommittedRegionCount loads the region count the current
// leader last published. It returns (0, nil) when the key is absent (a fresh
// cluster or one upgraded from a version that never wrote it), which callers
// treat as "no committed regions".
func (se *StorageEndpoint) LoadRegionSyncerCommittedRegionCount() (uint64, error) {
value, err := se.Load(keypath.RegionSyncerCommittedRegionCountPath())
if err != nil || value == "" {
return 0, err
}
count, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, errs.ErrStrconvParseUint.Wrap(err).GenWithStackByArgs()
}
return count, nil
}

// SaveRegionSyncerCommittedRegionCount persists the region count the current
// leader is serving so other members can tell whether they are caught up
// before campaigning for PD leadership.
func (se *StorageEndpoint) SaveRegionSyncerCommittedRegionCount(count uint64) error {
return se.Save(keypath.RegionSyncerCommittedRegionCountPath(), strconv.FormatUint(count, 10))
}
77 changes: 77 additions & 0 deletions pkg/storage/endpoint/region_syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"math"
"testing"

"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/keypath"
)

// newMemStorageEndpoint returns a StorageEndpoint backed by an in-memory KV,
// for tests that exercise endpoint encode/decode round-trips without etcd.
func newMemStorageEndpoint() *StorageEndpoint {
return NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

// TestRegionSyncerCommittedRegionCountAbsent verifies the back-compat path the
// leader-election gate relies on: an unwritten key (fresh or pre-upgrade
// cluster) must read back as (0, nil) rather than an error, so the gate treats
// it as "no committed regions" and allows the campaign.
func TestRegionSyncerCommittedRegionCountAbsent(t *testing.T) {
re := require.New(t)
se := newMemStorageEndpoint()

count, err := se.LoadRegionSyncerCommittedRegionCount()
re.NoError(err)
re.Equal(uint64(0), count)
}

// TestRegionSyncerCommittedRegionCountRoundTrip verifies Save/Load preserves the
// value across the uint64<->string boundary, including the zero (empty cluster)
// and max-uint64 edges, and that a later Save overwrites the prior value.
func TestRegionSyncerCommittedRegionCountRoundTrip(t *testing.T) {
re := require.New(t)
se := newMemStorageEndpoint()

for _, want := range []uint64{0, 1, 110, math.MaxUint64} {
re.NoError(se.SaveRegionSyncerCommittedRegionCount(want))
got, err := se.LoadRegionSyncerCommittedRegionCount()
re.NoError(err)
re.Equal(want, got)
}

// A subsequent write replaces the prior value (counts shrink on merges).
re.NoError(se.SaveRegionSyncerCommittedRegionCount(42))
got, err := se.LoadRegionSyncerCommittedRegionCount()
re.NoError(err)
re.Equal(uint64(42), got)
}

// TestRegionSyncerCommittedRegionCountCorruptValue verifies a non-numeric value
// (corruption or a manual edit) surfaces as an error so the gate falls back to
// allowing the campaign rather than silently treating it as zero.
func TestRegionSyncerCommittedRegionCountCorruptValue(t *testing.T) {
re := require.New(t)
se := newMemStorageEndpoint()

re.NoError(se.Save(keypath.RegionSyncerCommittedRegionCountPath(), "not-a-number"))
_, err := se.LoadRegionSyncerCommittedRegionCount()
re.Error(err)
}
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Storage interface {
endpoint.GCStateStorage
endpoint.MinResolvedTSStorage
endpoint.ExternalTSStorage
endpoint.RegionSyncerStorage
endpoint.KeyspaceStorage
endpoint.ResourceGroupStorage
endpoint.TSOStorage
Expand Down
56 changes: 54 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ const (
func (s *RegionSyncer) StopSyncWithLeader() {
s.reset()
s.wg.Wait()
// If this sync session never observed an end-of-history marker, the
// in-memory history index reflects a partial transfer that the leader
// never confirmed. Roll back to the last committed (persisted) index
// so the next leader sees a StartIndex that triggers a fresh bulk
// rather than a stale offset into a different leader's history space.
if !s.historySynced.Load() {
s.history.rollback()
}
}

func (s *RegionSyncer) reset() {
Expand All @@ -70,6 +78,22 @@ func (s *RegionSyncer) ResetHistoryIndex(index uint64) {
s.history.resetWithIndexAndPersist(index)
}

// HasAttemptedSync reports whether StartSyncWithLeader has ever been called
// during this process's lifetime. Used by the leader-election path to tell
// "I am a fresh single-node cluster" apart from "I am a follower that needs
// to be caught up before campaigning".
func (s *RegionSyncer) HasAttemptedSync() bool {
return s.attemptedSync.Load()
}

// IsHistorySynced reports whether this server has, at some point during its
// lifetime, observed that it was caught up to a leader's history. The signal
// is sticky: once true, the local region storage is durably populated and
// further syncs only extend that state.
func (s *RegionSyncer) IsHistorySynced() bool {
return s.historySynced.Load()
}

func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) {
cli := pdpb.NewPDClient(conn)
syncStream, err := cli.SyncRegions(ctx)
Expand All @@ -90,6 +114,9 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C

var regionGuide = core.GenerateRegionGuideFunc(false)

// handleRegionSyncResponse applies one sync response from the leader to the
// follower's region cache, storage, and history buffer. It returns whether the
// response was handled and whether a full sync is still in progress.
func (s *RegionSyncer) handleRegionSyncResponse(
ctx context.Context,
resp *pdpb.SyncRegionResponse,
Expand Down Expand Up @@ -126,6 +153,22 @@ func (s *RegionSyncer) handleRegionSyncResponse(
}
hasStats := len(stats) == len(regions)
hasBuckets := len(buckets) == len(regions)
// An empty-regions response is the explicit end-of-history
// marker: sent by the leader at the end of a bulk transfer,
// at the end of an incremental catch-up, as an "already in
// sync" reply, or as a keepalive. Receiving one commits the
// historical phase — we flush the accumulated history index
// to disk and flip historySynced, which both opens the
// leader-election gate and switches subsequent records from
// the non-persisting catch-up path to the normal persisting
// live-stream path.
if len(regions) == 0 {
if !s.historySynced.Load() {
s.history.commit()
}
s.historySynced.Store(true)
}
inCatchup := !s.historySynced.Load()
for i, r := range regions {
var (
region *core.RegionInfo
Expand Down Expand Up @@ -165,7 +208,16 @@ func (s *RegionSyncer) handleRegionSyncResponse(
err = regionStorage.SaveRegion(r)
}
if err == nil && !inFullSync {
s.history.record(region)
// Full-sync frames carry positional offsets, not reusable history
// indices, so they are applied to storage but not recorded here.
// Records during historical catch-up are buffered without persisting
// and flushed by commit() on the end-of-history marker; live records
// after catch-up persist normally.
if inCatchup {
s.history.recordNoPersist(region)
} else {
s.history.record(region)
}
}
for _, old := range overlaps {
_ = regionStorage.DeleteRegion(old.GetMeta())
Expand All @@ -187,7 +239,7 @@ func (s *RegionSyncer) IsRunning() bool {
// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)

s.attemptedSync.Store(true)
s.mu.Lock()
defer s.mu.Unlock()
s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(s.server.LoopContext())
Expand Down
50 changes: 50 additions & 0 deletions pkg/syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package syncer

import (
"context"
"strconv"
"testing"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockserver"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -73,6 +75,54 @@ func TestLoadRegion(t *testing.T) {
re.Less(time.Since(start), time.Second*2)
}

// TestHistorySyncedInitFromDurableState verifies that NewRegionSyncer
// seeds historySynced from the persisted history index so a node that
// previously completed a sync can still campaign after a restart followed
// by a leader death mid-sync. A fresh-on-disk node must stay false so it
// is forced through a real catch-up before it can campaign.
func TestHistorySyncedInitFromDurableState(t *testing.T) {
re := require.New(t)

newSyncer := func(seed func(kv.Base)) *RegionSyncer {
tempDir := t.TempDir()
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)
t.Cleanup(func() { re.NoError(rs.Close()) })
seed(rs)
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs),
core.NewBasicCluster(),
)
return NewRegionSyncer(server)
}

// Fresh KV: no persisted historyIndex. Stays false so the node is
// gated until it actually catches up to a leader.
rc := newSyncer(func(kv.Base) {})
re.False(rc.IsHistorySynced(),
"fresh KV should not be treated as already synced")

// Persisted historyIndex from a prior successful sync. The index only
// ever lands on disk after SaveRegion calls succeeded (via commit() or
// record()'s flushCount path), so a non-zero index is sufficient
// evidence of durable region state.
rc = newSyncer(func(b kv.Base) {
re.NoError(b.Save(historyKey, "42"))
})
re.True(rc.IsHistorySynced(),
"persisted history index must initialize historySynced to true")
re.Equal(uint64(42), rc.history.getNextIndex(),
"history index should reload the persisted value")
// Sanity: a value that round-trips through strconv matches what
// history_buffer's persist path writes.
v, err := strconv.ParseUint("42", 10, 64)
re.NoError(err)
re.Equal(rc.history.getNextIndex(), v)
}

func TestErrorCode(t *testing.T) {
re := require.New(t)
tempDir := t.TempDir()
Expand Down
46 changes: 46 additions & 0 deletions pkg/syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (h *historyBuffer) capacity() int {
return h.size - 1
}

// record appends a region change to the history buffer, advancing the next
// index and periodically persisting it to the backing storage.
func (h *historyBuffer) record(r *core.RegionInfo) {
h.Lock()
defer h.Unlock()
Expand All @@ -142,6 +144,23 @@ func (h *historyBuffer) record(r *core.RegionInfo) {
}
}

// recordNoPersist is like record but does not advance the on-disk
// historyIndex. The syncer client uses it during a historical catch-up
// so that a partially-received bulk does not commit a non-zero index; if
// the catch-up is interrupted (leader failover, process crash), the next
// sync starts from the last committed index instead of the middle of a
// half-applied transfer.
func (h *historyBuffer) recordNoPersist(r *core.RegionInfo) {
h.Lock()
defer h.Unlock()
syncIndexGauge.Set(float64(h.index))
h.records[h.tail] = r
h.tail = (h.tail + 1) % h.size
if h.tail == h.head {
h.head = (h.head + 1) % h.size
}
h.index++
}
func (h *historyBuffer) recordsFrom(index uint64) []*core.RegionInfo {
h.RLock()
defer h.RUnlock()
Expand Down Expand Up @@ -379,10 +398,37 @@ func (h *historyBuffer) resizeLocked(newCapacity int) {
h.tail = keep % h.size
}

// getLocked returns the region recorded at the given history index, or nil if
// the index is outside the retained window. The caller must hold the lock.
func (h *historyBuffer) getLocked(index uint64) *core.RegionInfo {
if index < h.nextIndex() && index >= h.firstIndex() {
pos := (h.head + int(index-h.firstIndex())) % h.size
return h.records[pos]
}
return nil
}

// commit persists the current historyIndex. The syncer client calls this
// when an end-of-history marker is received, to atomically commit a bulk
// or catch-up transfer.
func (h *historyBuffer) commit() {
h.Lock()
defer h.Unlock()
h.persist()
h.flushCount = defaultFlushCount
}

// rollback resets the in-memory historyIndex back to the last value
// persisted on disk and clears the ring buffer. The syncer client calls
// this when a sync session ends without ever receiving an end-of-history
// marker, so the next sync starts from the last committed index (0 for a
// fresh member that never completed a catch-up).
func (h *historyBuffer) rollback() {
h.Lock()
defer h.Unlock()
h.index = 0
h.head = 0
h.tail = 0
h.flushCount = defaultFlushCount
h.reload()
}
Loading
Loading