Skip to content

Commit c0b70cb

Browse files
authored
Replication Misc Fixes (#1467)
* reset iterator at failover * change order of dispose to improve unit test reliability
1 parent 3d7596f commit c0b70cb

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

libs/cluster/Server/Failover/ReplicaFailoverSession.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ private bool TakeOverAsPrimary()
138138
// Update replicationIds and replicationOffset2
139139
clusterProvider.replicationManager.TryUpdateForFailover();
140140

141+
// Reset replay iterators
142+
clusterProvider.replicationManager.ResetReplayIterator();
143+
141144
// Initialize checkpoint history
142145
if (!clusterProvider.replicationManager.InitializeCheckpointStore())
143146
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(TakeOverAsPrimary));

libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,17 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
3333
{
3434
// logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
3535
var currentConfig = clusterProvider.clusterManager.CurrentConfig;
36+
var syncReplay = clusterProvider.serverOptions.ReplicationOffsetMaxLag == 0;
37+
38+
// Need to ensure that this replay task is allowed to complete before the replicaReplayGroup is disposed
39+
// NOTE: this should not be expensive because every replay task has its own lock copy
40+
// Cache invalidation happens only on dispose which is rare operation
41+
var failReplay = syncReplay && !activeReplay.TryReadLock();
3642
try
3743
{
44+
if (failReplay)
45+
throw new GarnetException($"Failed to acquire activeReplay lock!", LogLevel.Warning, clientResponse: false);
46+
3847
if (clusterProvider.replicationManager.CannotStreamAOF)
3948
{
4049
logger?.LogError("Replica is recovering cannot sync AOF");
@@ -120,9 +129,13 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
120129
catch (Exception ex)
121130
{
122131
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ProcessPrimaryStream");
123-
ResetReplayIterator();
124132
throw new GarnetException(ex.Message, ex, LogLevel.Warning, clientResponse: false);
125133
}
134+
finally
135+
{
136+
if (syncReplay && !failReplay)
137+
activeReplay.ReadUnlock();
138+
}
126139
}
127140
}
128141
}

libs/server/StoreWrapper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,13 +910,13 @@ public void Dispose()
910910
disposed = true;
911911

912912
itemBroker?.Dispose();
913+
clusterProvider?.Dispose();
913914
monitor?.Dispose();
914915
luaTimeoutManager?.Dispose();
915916
ctsCommit?.Cancel();
916917
databaseManager.Dispose();
917918

918919
ctsCommit?.Dispose();
919-
clusterProvider?.Dispose();
920920
}
921921
}
922922
}

0 commit comments

Comments
 (0)