From 09c5d3d05f419deb0177d3b9714c0bf21474cabf Mon Sep 17 00:00:00 2001
From: Roshan Khatri <117414976+roshkhatri@users.noreply.github.com>
Date: Tue, 9 Jun 2026 16:45:31 -0700
Subject: [PATCH 01/15] Fix IO-Threads redesign cleanup perf regression from
#3544 (#3938)
This regression is still present in 9.1 GA as the cherrypick of revert
commit was missed during release.
Re-applies #3544 (reverted in #3756 due to ~20% SET regression) with the
performance fix from #3760.
**Root Cause:** The original #3544 changed
`tryOffloadFreeObjToIOThreads` to only offload the SDS buffer free to IO
threads, freeing the `robj` shell on the main thread. I carried out
profiling for the change and it showed that freeing the `robj` shell on
the main thread became the prime main-thread hotspot (~10% CPU), while
IO threads shifted from doing real `jemalloc` work to spinning idle on
`spmcDequeue`.
**Fix**: Keep `tryOffloadFreeObjToIOThreads` offloading the entire robj
(`decrRefCount`) to the IO thread. Cross-thread `zfree` is safe with
`jemalloc`.
This PR includes all cleanup work from #3544 so -
- `trySendWriteToIOThreads`: defer clearing `last_header` until after
successful enqueue
- `evictClients`: simplified bookkeeping
- Queue sizes as runtime parameters instead of compile-time macros
- IO ignition policy using `stat_active_time` instead of `getrusage`
- Function renames (`IOThreadFreeArgv` --> `ioThreadFreeArgv`, etc.) and
doc comments
**Benchmark** on (Graviton4 c8gb.metal-48xl):
Config: SET, 128B values, 9 IO threads, pipeline=10, 1600 clients - Same
as Valkey official method
| Version | Throughput |
|---------|-----------|
| Unstable + original #3544 | ~1,554K rps |
| Unstable + this PR | ~2,116K rps |
Diff vs original #3544 (perf fix)
```diff
diff --git a/src/io_threads.c b/src/io_threads.c
--- a/src/io_threads.c
+++ b/src/io_threads.c
@@ // IO thread handler
case JOB_REQ_FREE_OBJ:
- zfree(data);
+ decrRefCount(data);
break;
@@ // tryOffloadFreeObjToIOThreads
- /* We offload only the free of the ptr that may be allocated by the I/O thread.
- * The object itself was allocated by the main thread and will be freed by the main thread. */
- void *job = tagJob(sdsAllocPtr(objectGetVal(obj)), JOB_REQ_FREE_OBJ);
+ void *job = tagJob(obj, JOB_REQ_FREE_OBJ);
if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) return C_ERR;
- objectSetVal(obj, NULL);
- decrRefCount(obj);
io_jobs_submitted++;
```
---------
Signed-off-by: Roshan Khatri
---
src/Makefile | 2 +-
src/io_threads.c | 84 ++++++++++++++++++----------------------
src/io_threads.h | 6 +--
src/networking.c | 19 +--------
src/queues.c | 79 ++++++++++++++++++++-----------------
src/queues.h | 40 +++++++++----------
src/server.h | 21 +++++-----
src/unit/test_queues.cpp | 9 +++--
8 files changed, 122 insertions(+), 138 deletions(-)
diff --git a/src/Makefile b/src/Makefile
index 3b81517f4d4..2812c4dbad7 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -582,7 +582,7 @@ ENGINE_SERVER_OBJ = \
ziplist.o \
zipmap.o \
zmalloc.o \
- queues.o
+ queues.o
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ = \
diff --git a/src/io_threads.c b/src/io_threads.c
index ca7d142fd9b..d6368f5524e 100644
--- a/src/io_threads.c
+++ b/src/io_threads.c
@@ -8,6 +8,10 @@
#include "queues.h"
#include
+#define IO_MPSC_QUEUE_SIZE 16384
+#define IO_SPMC_QUEUE_SIZE 4096
+#define IO_SPSC_QUEUE_SIZE 4096
+
static _Thread_local int thread_id = 0;
static _Thread_local mpscTicket io_thread_ticket = {0};
/* Backlog of responses when io_shared_outbox is full. Should be rare. */
@@ -44,8 +48,8 @@ static inline void untagJob(void *tagged_ptr, void **ptr, int *type) {
/* Handler prototypes */
void ioThreadReadQueryFromClient(client *c);
void ioThreadWriteToClient(client *c);
-void IOThreadFreeArgv(robj **argv);
-void IOThreadPoll(aeEventLoop *el);
+void ioThreadFreeArgv(robj **argv);
+void ioThreadPoll(aeEventLoop *el);
static void ioThreadAccept(client *c);
int inMainThread(void) {
@@ -107,9 +111,6 @@ void waitForClientIO(client *c) {
}
void IOThreadsBeforeSleep(long long current_time) {
-#ifndef RUSAGE_THREAD
- UNUSED(current_time);
-#endif
if (server.io_threads_num == 1) return;
serverAssert(inMainThread());
@@ -126,29 +127,23 @@ void IOThreadsBeforeSleep(long long current_time) {
}
}
-#ifdef RUSAGE_THREAD
- /* If threads are not active track main thread CPU time for ignition decision */
+ /* If threads are not active, track main-thread active time for ignition decision */
if (server.active_io_threads_num == 1) {
static long long last_measurement_time = 0;
if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */
last_measurement_time = current_time;
- struct rusage ru;
- if (getrusage(RUSAGE_THREAD, &ru) == 0) {
- long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec;
- long long user_time_us = ru.ru_utime.tv_sec * 1000000LL + ru.ru_utime.tv_usec;
- trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000);
- trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER, user_time_us, current_time, 1000000);
- }
+ trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, server.stat_active_time, current_time, 1000000);
}
-#endif
}
#define IO_COOLDOWN_MS 1000
#define IO_SAMPLE_RATE_MS 10
#define IO_IGNITION_EVENTS 4
-#define IO_IGNITION_CPU_SYS 30.0
-#define IO_IGNITION_CPU_SYS_LOW 5.0
-#define IO_IGNITION_CPU_USER 50.0
+/* Start using I/O threads when the main thread is active for more than the below
+ * defined percentage of the time. This number is picked somewhat arbitrarily but
+ * needed to be low enough to make sure we start the next thread quickly while not
+ * starting too many threads unnecessarily to avoid contention. */
+#define IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT 30
#define BATCH_SIZE 32
void IOThreadsAfterSleep(int numevents) {
@@ -171,15 +166,9 @@ void IOThreadsAfterSleep(int numevents) {
/* Ignition Policy */
if (server.active_io_threads_num == 1) {
int should_ignite = 0;
-#ifdef RUSAGE_THREAD
- float cpu_sys = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS) / 10000.0;
- float cpu_user = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER) / 10000.0;
- /* Ignite IO threads if sys CPU > 30%, or if sys CPU > 5% and user CPU > 50% */
- should_ignite = (cpu_sys > IO_IGNITION_CPU_SYS) ||
- (cpu_sys > IO_IGNITION_CPU_SYS_LOW && cpu_user > IO_IGNITION_CPU_USER);
-#else
- should_ignite = (numevents >= IO_IGNITION_EVENTS);
-#endif
+ float main_thread_active_time = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME) / 10000.0;
+ /* Ignite IO threads when main-thread active time exceeds the threshold (30%) */
+ should_ignite = (main_thread_active_time > (float)IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT);
if (should_ignite) {
pthread_mutex_unlock(&io_threads_mutex[1]);
server.active_io_threads_num++;
@@ -243,7 +232,7 @@ void IOThreadsAfterSleep(int numevents) {
/* This function performs polling on the given event loop and updates the server's
* IO fired events count and poll state. */
-void IOThreadPoll(aeEventLoop *el) {
+void ioThreadPoll(aeEventLoop *el) {
struct timeval tvp = {0, 0};
int num_events = aePoll(el, &tvp);
server.io_ae_fired_events = num_events;
@@ -326,10 +315,10 @@ static void *IOThreadMain(void *myid) {
switch (type) {
case JOB_REQ_FREE_ARGV:
- IOThreadFreeArgv((robj **)data);
+ ioThreadFreeArgv((robj **)data);
break;
case JOB_REQ_POLL:
- IOThreadPoll((aeEventLoop *)data);
+ ioThreadPoll((aeEventLoop *)data);
break;
default:
serverPanic("Invalid SPSC job type: %d", type);
@@ -338,10 +327,8 @@ static void *IOThreadMain(void *myid) {
processed += batch_count;
}
- /*
- * PRIORITY 2: Shared Global Queue (SPMC)
- * Only checked after SPSC is drained.
- */
+ /* PRIORITY 2: Shared Global Queue (SPMC)
+ * Only checked after SPSC is drained. */
void *tagged_job = spmcDequeue(&io_shared_inbox);
if (tagged_job) {
void *data;
@@ -362,7 +349,7 @@ static void *IOThreadMain(void *myid) {
ioThreadAccept((client *)data);
break;
case JOB_REQ_POLL:
- IOThreadPoll((aeEventLoop *)data);
+ ioThreadPoll((aeEventLoop *)data);
break;
default:
serverPanic("Invalid SPMC job type: %d", type);
@@ -398,7 +385,7 @@ static void createIOThread(int id) {
serverAssert(id > 0 && id < server.io_threads_num);
/* Initialize the private SPSC queue for this thread */
- spscInit(&io_private_inbox[id]);
+ spscInit(&io_private_inbox[id], IO_SPSC_QUEUE_SIZE);
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[id], NULL);
@@ -458,7 +445,7 @@ int updateIOThreads(const char **err) {
* in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */
size_t pending = getPendingIOResponsesCount();
- if (pending > MPSC_QUEUE_SIZE) {
+ if (pending > io_shared_outbox.queue_size) {
if (err) *err = "Can't update IO threads under load, try again later";
return 0;
}
@@ -497,8 +484,8 @@ void initIOThreads(int prev_threads_num) {
server.active_io_threads_num = 1; /* We start with threads not active. */
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;
- spmcInit(&io_shared_inbox);
- mpscInit(&io_shared_outbox);
+ spmcInit(&io_shared_inbox, IO_SPMC_QUEUE_SIZE);
+ mpscInit(&io_shared_outbox, IO_MPSC_QUEUE_SIZE);
io_jobs_submitted = 0;
atomic_init(&io_jobs_finished, 0);
prefetchCommandsBatchInit();
@@ -560,6 +547,7 @@ int trySendWriteToIOThreads(client *c) {
if (c->flag.lua_debug) return C_ERR;
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
+ clientReplyBlock *block = NULL;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
@@ -571,14 +559,10 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
- clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
+ block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
- /* If buffer is encoded force new header */
- if (block->flag.buf_encoded) block->last_header = NULL;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
- /* If buffer is encoded force new header */
- if (c->flag.buf_encoded) c->last_header = NULL;
}
}
@@ -597,7 +581,15 @@ int trySendWriteToIOThreads(client *c) {
c->io_last_bufpos = 0;
return C_ERR;
}
-
+ /* Force new header after successful enqueue so the main thread doesn't
+ * extend a header the I/O thread is currently reading. */
+ if (!is_replica) {
+ if (block) {
+ if (block->flag.buf_encoded) block->last_header = NULL;
+ } else {
+ if (c->flag.buf_encoded) c->last_header = NULL;
+ }
+ }
if (c->flag.pending_write) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flag.pending_write = 0;
@@ -609,7 +601,7 @@ int trySendWriteToIOThreads(client *c) {
}
/* Internal function to free the client's argv in an IO thread. */
-void IOThreadFreeArgv(robj **argv) {
+void ioThreadFreeArgv(robj **argv) {
int last_arg = 0;
for (int i = 0;; i++) {
robj *o = argv[i];
diff --git a/src/io_threads.h b/src/io_threads.h
index 455079d4ade..4202f6508a2 100644
--- a/src/io_threads.h
+++ b/src/io_threads.h
@@ -12,16 +12,14 @@ typedef enum {
JOB_REQ_ACCEPT,
JOB_REQ_COUNT
} JobRequest;
-_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");
+_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 8 for pointer arithmetic");
typedef enum {
JOB_RES_READ_CLIENT = 0,
JOB_RES_WRITE_CLIENT,
JOB_RES_COUNT
} JobResult;
-_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic");
-
-typedef void (*job_handler)(void *);
+_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 8 for pointer arithmetic");
void initIOThreads(int prev_threads_num);
void killIOThreads(void);
diff --git a/src/networking.c b/src/networking.c
index c758abe4bff..ee24eb2e43c 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -6532,31 +6532,16 @@ void evictClients(void) {
size_t client_eviction_limit = getClientEvictionLimit();
if (client_eviction_limit == 0) return;
- /* Variable to track memory of clients marked for close but not yet freed */
- size_t pending_freed = 0;
-
while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
- server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
- pending_freed >
+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >
client_eviction_limit) {
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
- if (c->flag.close_asap) {
- /* Already scheduled to close. Count memory as freed and skip. */
- pending_freed += getClientMemoryUsage(c, NULL);
- continue;
- }
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
+ if (freeClient(c)) server.stat_evictedclients++;
sdsfree(ci);
- server.stat_evictedclients++;
-
- if (freeClient(c) == 0) {
- /* Protected client (async close). Count memory as freed and skip. */
- pending_freed += getClientMemoryUsage(c, NULL);
- continue;
- }
} else {
curr_bucket--;
if (curr_bucket < 0) {
diff --git a/src/queues.c b/src/queues.c
index d2146d5d9a9..2bb5605d9bb 100644
--- a/src/queues.c
+++ b/src/queues.c
@@ -4,24 +4,27 @@
* SPDX-License-Identifier: BSD-3-Clause
*
* Implementation of MPSC, SPMC, and SPSC queues
- *
*/
#include "queues.h"
#include "zmalloc.h"
-inline void mpscInit(mpscQueue *q) {
- q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * MPSC_QUEUE_SIZE);
+
+void mpscInit(mpscQueue *q, size_t queue_size) {
+ /* Queue size must be a power of 2 (the masking logic relies on it) */
+ assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0);
+ q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * queue_size);
+ q->queue_size = queue_size;
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->head_cache, 0);
q->tail_cache = 0;
- for (size_t i = 0; i < MPSC_QUEUE_SIZE; ++i) {
+ for (size_t i = 0; i < q->queue_size; ++i) {
atomic_init(&q->buffer[i], NULL);
}
}
-inline void mpscFree(mpscQueue *q) {
+void mpscFree(mpscQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
@@ -32,7 +35,7 @@ inline void mpscFree(mpscQueue *q) {
q->tail_cache = 0;
}
-inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
+bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
size_t tail;
assert(data);
@@ -45,12 +48,12 @@ inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
/* Check limits (Fullness check) */
size_t head = atomic_load_explicit(&q->head_cache, memory_order_acquire);
- if ((tail - head) >= MPSC_QUEUE_SIZE) {
+ if ((tail - head) >= q->queue_size) {
/* Cached limit reached, refresh from actual head */
head = atomic_load_explicit(&q->head, memory_order_acquire);
atomic_store_explicit(&q->head_cache, head, memory_order_release);
- if (unlikely((tail - head) >= MPSC_QUEUE_SIZE)) {
+ if (unlikely((tail - head) >= q->queue_size)) {
/* Queue is full - Persist reservation for retry */
ticket->index = tail;
ticket->has_reservation = true;
@@ -59,13 +62,13 @@ inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
}
/* Commit data */
- atomic_store_explicit(&q->buffer[tail & MPSC_QUEUE_MASK], data, memory_order_release);
+ atomic_store_explicit(&q->buffer[tail & (q->queue_size - 1)], data, memory_order_release);
ticket->has_reservation = false;
return true;
}
-inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) {
+size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) {
size_t popped_count = 0;
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
size_t tail = q->tail_cache;
@@ -81,13 +84,13 @@ inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) {
if (limit > max_jobs) limit = max_jobs;
for (size_t i = 0; i < limit; ++i) {
- void *data = atomic_load_explicit(&q->buffer[head & MPSC_QUEUE_MASK], memory_order_relaxed);
+ void *data = atomic_load_explicit(&q->buffer[head & (q->queue_size - 1)], memory_order_relaxed);
/* Stop if slot is reserved but data not yet written */
if (!data) break;
jobs_out[popped_count++] = data;
- atomic_store_explicit(&q->buffer[head & MPSC_QUEUE_MASK], NULL, memory_order_relaxed);
+ atomic_store_explicit(&q->buffer[head & (q->queue_size - 1)], NULL, memory_order_relaxed);
head++;
}
@@ -103,19 +106,22 @@ inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) {
* SPMC QUEUE (Single-Producer Multi-Consumer)
* ========================================================================== */
-inline void spmcInit(spmcQueue *q) {
- q->buffer = (spmcCell *)zmalloc_cache_aligned(sizeof(spmcCell) * SPMC_QUEUE_SIZE);
+inline void spmcInit(spmcQueue *q, size_t queue_size) {
+ /* Queue size must be a power of 2 (the masking logic relies on it) */
+ assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0);
+ q->buffer = (spmcCell *)zmalloc_cache_aligned(sizeof(spmcCell) * queue_size);
+ q->queue_size = queue_size;
atomic_init(&q->head, 0);
q->tail = 0;
q->head_cache = 0;
- for (size_t i = 0; i < SPMC_QUEUE_SIZE; i++) {
+ for (size_t i = 0; i < q->queue_size; i++) {
atomic_init(&q->buffer[i].sequence, i);
q->buffer[i].data = NULL;
}
}
-inline void spmcFree(spmcQueue *q) {
+void spmcFree(spmcQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
@@ -125,7 +131,7 @@ inline void spmcFree(spmcQueue *q) {
q->head_cache = 0;
}
-inline bool spmcIsEmpty(spmcQueue *q) {
+bool spmcIsEmpty(spmcQueue *q) {
/* Fast path: Check against cached consumer position */
if (q->tail == q->head_cache) {
return true;
@@ -138,13 +144,13 @@ inline bool spmcIsEmpty(spmcQueue *q) {
return q->tail == curr_head;
}
-inline size_t spmcSize(spmcQueue *q) {
+size_t spmcSize(spmcQueue *q) {
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
return (q->tail >= head) ? (q->tail - head) : 0;
}
-inline bool spmcEnqueue(spmcQueue *q, void *data) {
- spmcCell *cell = &q->buffer[q->tail & SPMC_QUEUE_MASK];
+bool spmcEnqueue(spmcQueue *q, void *data) {
+ spmcCell *cell = &q->buffer[q->tail & (q->queue_size - 1)];
size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
/* Sequence Check:
@@ -163,13 +169,13 @@ inline bool spmcEnqueue(spmcQueue *q, void *data) {
return true;
}
-inline void *spmcDequeue(spmcQueue *q) {
+void *spmcDequeue(spmcQueue *q) {
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
spmcCell *cell;
void *data;
while (1) {
- cell = &q->buffer[head & SPMC_QUEUE_MASK];
+ cell = &q->buffer[head & (q->queue_size - 1)];
size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1);
@@ -182,7 +188,7 @@ inline void *spmcDequeue(spmcQueue *q) {
data = cell->data;
/* Mark slot empty for next generation (pos + size) */
- atomic_store_explicit(&cell->sequence, head + SPMC_QUEUE_SIZE, memory_order_release);
+ atomic_store_explicit(&cell->sequence, head + q->queue_size, memory_order_release);
return data;
}
} else if (diff < 0) {
@@ -199,8 +205,11 @@ inline void *spmcDequeue(spmcQueue *q) {
* SPSC QUEUE (Single-Producer Single-Consumer)
* ========================================================================== */
-inline void spscInit(spscQueue *q) {
- q->buffer = (void **)zmalloc(sizeof(void *) * SPSC_QUEUE_SIZE);
+void spscInit(spscQueue *q, size_t queue_size) {
+ /* Queue size must be a power of 2 (the masking logic relies on it) */
+ assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0);
+ q->buffer = (void **)zmalloc(sizeof(void *) * queue_size);
+ q->queue_size = queue_size;
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
q->head_cache = 0;
@@ -208,7 +217,7 @@ inline void spscInit(spscQueue *q) {
q->tail_local = 0;
}
-inline void spscFree(spscQueue *q) {
+void spscFree(spscQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
@@ -220,13 +229,13 @@ inline void spscFree(spscQueue *q) {
q->tail_local = 0;
}
-inline bool spscIsFull(spscQueue *q) {
+bool spscIsFull(spscQueue *q) {
const size_t curr_tail = q->tail_local;
- if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) {
+ if (curr_tail - q->head_cache >= q->queue_size) {
q->head_cache = atomic_load_explicit(&q->head, memory_order_acquire);
- if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) {
+ if (curr_tail - q->head_cache >= q->queue_size) {
/* Flush any local changes before reporting full */
if (q->tail_local != q->tail) {
atomic_store_explicit(&q->tail, q->tail_local, memory_order_release);
@@ -237,8 +246,8 @@ inline bool spscIsFull(spscQueue *q) {
return false;
}
-inline void spscEnqueue(spscQueue *q, void *data, bool commit) {
- q->buffer[q->tail_local & SPSC_QUEUE_MASK] = data;
+void spscEnqueue(spscQueue *q, void *data, bool commit) {
+ q->buffer[q->tail_local & (q->queue_size - 1)] = data;
q->tail_local++;
if (commit) {
@@ -246,13 +255,13 @@ inline void spscEnqueue(spscQueue *q, void *data, bool commit) {
}
}
-inline void spscCommit(spscQueue *q) {
+void spscCommit(spscQueue *q) {
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
if (q->tail_local == tail) return;
atomic_store_explicit(&q->tail, q->tail_local, memory_order_release);
}
-inline size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) {
+size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) {
size_t curr_head = atomic_load_explicit(&q->head, memory_order_relaxed);
size_t curr_tail_cache = q->tail_cache;
@@ -266,13 +275,13 @@ inline size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) {
size_t count = (num_jobs < available) ? num_jobs : available;
for (size_t i = 0; i < count; ++i) {
- jobs_out[i] = q->buffer[(curr_head + i) & SPSC_QUEUE_MASK];
+ jobs_out[i] = q->buffer[(curr_head + i) & (q->queue_size - 1)];
}
atomic_store_explicit(&q->head, curr_head + count, memory_order_release);
return count;
}
-inline bool spscIsEmpty(spscQueue *q) {
+bool spscIsEmpty(spscQueue *q) {
/* Fast path */
if (q->tail_local == q->head_cache) {
return true;
diff --git a/src/queues.h b/src/queues.h
index 4b6e4bc53e6..8c10649473a 100644
--- a/src/queues.h
+++ b/src/queues.h
@@ -37,12 +37,6 @@
* MPSC QUEUE (Multi-Producer Single-Consumer)
* ========================================================================== */
-#define MPSC_QUEUE_SIZE 16384
-#define MPSC_QUEUE_MASK (MPSC_QUEUE_SIZE - 1)
-#ifndef __cplusplus
-static_assert((MPSC_QUEUE_SIZE & (MPSC_QUEUE_SIZE - 1)) == 0, "MPSC_QUEUE_SIZE must be power of 2");
-#endif
-
typedef struct mpscTicket {
size_t index;
bool has_reservation;
@@ -59,9 +53,12 @@ typedef struct mpscQueue {
/* Data buffer */
_Alignas(CACHE_LINE_SIZE) _Atomic(void *) *buffer;
+ size_t queue_size;
} mpscQueue;
-void mpscInit(mpscQueue *q);
+/* Initializes an MPSC queue with a size that must be a power of 2 */
+void mpscInit(mpscQueue *q, size_t queue_size);
+/* Frees the MPSC queue's internal buffer and resets its state */
void mpscFree(mpscQueue *q);
/* Pushes an item into the queue and returns true if the queue is not full.
@@ -77,12 +74,6 @@ size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs);
* SPMC QUEUE (Single-Producer Multi-Consumer)
* ========================================================================== */
-#define SPMC_QUEUE_SIZE 4096
-#define SPMC_QUEUE_MASK (SPMC_QUEUE_SIZE - 1)
-#ifndef __cplusplus
-static_assert((SPMC_QUEUE_SIZE & (SPMC_QUEUE_SIZE - 1)) == 0, "SPMC_QUEUE_SIZE must be power of 2");
-#endif
-
typedef struct spmcCell {
_Alignas(CACHE_LINE_SIZE) _Atomic(size_t) sequence;
void *data;
@@ -98,25 +89,26 @@ typedef struct spmcQueue {
/* Data buffer */
_Alignas(CACHE_LINE_SIZE) spmcCell *buffer;
+ size_t queue_size;
} spmcQueue;
-void spmcInit(spmcQueue *q);
+/* Initializes an SPMC queue with a size that must be a power of 2 */
+void spmcInit(spmcQueue *q, size_t queue_size);
+/* Frees the SPMC queue's internal buffer and resets its state */
void spmcFree(spmcQueue *q);
+/* Returns true if the SPMC queue has no items */
bool spmcIsEmpty(spmcQueue *q);
+/* Returns an approximate number of items currently in the queue */
size_t spmcSize(spmcQueue *q);
+/* Pushes an item to the SPMC queue. Returns true on success, false if the queue is full. */
bool spmcEnqueue(spmcQueue *q, void *data);
+/* Pops and returns the next item from the queue, or NULL if the queue is empty */
void *spmcDequeue(spmcQueue *q);
/* ==========================================================================
* SPSC QUEUE (Single-Producer Single-Consumer)
* ========================================================================== */
-#define SPSC_QUEUE_SIZE 4096
-#define SPSC_QUEUE_MASK (SPSC_QUEUE_SIZE - 1)
-#ifndef __cplusplus
-static_assert((SPSC_QUEUE_SIZE & (SPSC_QUEUE_SIZE - 1)) == 0, "SPSC_QUEUE_SIZE must be power of 2");
-#endif
-
typedef struct spscQueue {
/* Consumer cache line */
_Alignas(CACHE_LINE_SIZE) _Atomic(size_t) head;
@@ -129,16 +121,22 @@ typedef struct spscQueue {
/* Dynamic buffer */
_Alignas(CACHE_LINE_SIZE) void **buffer;
+ size_t queue_size;
} spscQueue;
-void spscInit(spscQueue *q);
+/* Initializes an SPSC queue with a size that must be a power of 2 */
+void spscInit(spscQueue *q, size_t queue_size);
+/* Frees the SPSC queue's internal buffer and resets its state */
void spscFree(spscQueue *q);
+/* Returns true if the queue is full, or false otherwise */
bool spscIsFull(spscQueue *q);
/* Push data to the queue. Caller must ensure queue is not full via spscIsFull().
* If commit is true, the tail pointer is updated immediately (visible to consumer) else,
* only local index is updated (batching). */
void spscEnqueue(spscQueue *q, void *data, bool commit);
+/* Publishes any pending batched enqueues by advancing the shared tail pointer */
void spscCommit(spscQueue *q);
+/* Pops up to num_jobs items from the queue and returns the actual number popped */
size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs);
/* Check if queue is empty from producer's perspective. */
bool spscIsEmpty(spscQueue *q);
diff --git a/src/server.h b/src/server.h
index fa6ebf3a8a3..9ae9fd30be0 100644
--- a/src/server.h
+++ b/src/server.h
@@ -195,17 +195,16 @@ struct ValkeyModule;
/* Instantaneous metrics tracking. */
#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */
typedef enum {
- STATS_METRIC_COMMAND = 0, /* Number of commands executed. */
- STATS_METRIC_NET_INPUT, /* Bytes read from network. */
- STATS_METRIC_NET_OUTPUT, /* Bytes written to network. */
- STATS_METRIC_NET_INPUT_REPLICATION, /* Bytes read from network during replication. */
- STATS_METRIC_NET_OUTPUT_REPLICATION, /* Bytes written to network during replication. */
- STATS_METRIC_EL_CYCLE, /* Number of eventloop cycled. */
- STATS_METRIC_EL_DURATION, /* Eventloop duration. */
- STATS_METRIC_IO_WAIT, /* IO queue size */
- STATS_METRIC_MAIN_THREAD_CPU_SYS, /* Main thread CPU sys time */
- STATS_METRIC_MAIN_THREAD_CPU_USER, /* Main thread CPU user time */
- STATS_METRIC_COUNT /* Total count */
+ STATS_METRIC_COMMAND = 0, /* Number of commands executed. */
+ STATS_METRIC_NET_INPUT, /* Bytes read from network. */
+ STATS_METRIC_NET_OUTPUT, /* Bytes written to network. */
+ STATS_METRIC_NET_INPUT_REPLICATION, /* Bytes read from network during replication. */
+ STATS_METRIC_NET_OUTPUT_REPLICATION, /* Bytes written to network during replication. */
+ STATS_METRIC_EL_CYCLE, /* Number of eventloop cycled. */
+ STATS_METRIC_EL_DURATION, /* Eventloop duration. */
+ STATS_METRIC_IO_WAIT, /* IO queue size */
+ STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, /* Main-thread active time */
+ STATS_METRIC_COUNT /* Total count */
} instantaneous_metric_type;
/* Protocol and I/O related defines */
diff --git a/src/unit/test_queues.cpp b/src/unit/test_queues.cpp
index 05b7687a6ee..729315b0908 100644
--- a/src/unit/test_queues.cpp
+++ b/src/unit/test_queues.cpp
@@ -22,6 +22,7 @@ extern "C" {
class SpscQueueTest : public ::testing::Test {
protected:
spscQueue q;
+ static constexpr size_t SPSC_QUEUE_SIZE = 4096;
struct ConsumerArg {
spscQueue *q;
@@ -29,7 +30,7 @@ class SpscQueueTest : public ::testing::Test {
};
void SetUp() override {
- spscInit(&q);
+ spscInit(&q, SPSC_QUEUE_SIZE);
}
void TearDown() override {
@@ -139,6 +140,7 @@ TEST_F(SpscQueueTest, TestSpscConcurrent) {
class SpmcQueueTest : public ::testing::Test {
protected:
spmcQueue q;
+ static constexpr size_t SPMC_QUEUE_SIZE = 4096;
struct ConsumerArg {
spmcQueue *q;
@@ -147,7 +149,7 @@ class SpmcQueueTest : public ::testing::Test {
};
void SetUp() override {
- spmcInit(&q);
+ spmcInit(&q, SPMC_QUEUE_SIZE);
ASSERT_NE(q.buffer, nullptr);
EXPECT_EQ(reinterpret_cast(q.buffer) % CACHE_LINE_SIZE, 0u);
}
@@ -251,6 +253,7 @@ TEST_F(SpmcQueueTest, TestSpmcConcurrent) {
class MpscQueueTest : public ::testing::Test {
protected:
mpscQueue q;
+ static constexpr size_t MPSC_QUEUE_SIZE = 16384;
struct ProducerArg {
mpscQueue *q;
@@ -259,7 +262,7 @@ class MpscQueueTest : public ::testing::Test {
};
void SetUp() override {
- mpscInit(&q);
+ mpscInit(&q, MPSC_QUEUE_SIZE);
}
void TearDown() override {
From 3b6d21680aad929146b53aac8424016fe73bd2e6 Mon Sep 17 00:00:00 2001
From: Daniil Kashapov
Date: Thu, 11 Jun 2026 02:45:40 +0500
Subject: [PATCH 02/15] Omit alldbs rule in ACL SAVE/LIST and CONFIG REWRITE
for compatibility (#3964)
Database-level ACL #2309 introduced `alldbs` rule that was explicit for
all users and because of that previous versions no longer had the
ability to parse ACL strings produced by later versions.
Omit `alldbs` in `ACLDescribeSelector()`, that is used in `ACL
SAVE/LOAD` and `CONFIG REWRITE` command paths so that downgrades would
be possible if new feature was not used (`db=` and `resetdbs` rules).
Keep `ACL GETUSER` command's output as is and return `alldbs` in
`databases` field because of command's field-value format.
Add test to check that `ACL LIST` omits implicit `alldbs` and add check
to existing `ACL SAVE` and `CONFIG REWRITE` tests.
Fixes #3915
Signed-off-by: Daniil Kashapov
---
src/acl.c | 2 +-
tests/unit/acl-v2.tcl | 18 ++++++++++++++++++
tests/unit/acl.tcl | 17 +++++++++++++++--
tests/unit/moduleapi/usercall.tcl | 6 +++---
4 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/src/acl.c b/src/acl.c
index 74ce448d334..e2765ba716b 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -869,7 +869,7 @@ static sds ACLDescribeSelector(aclSelector *selector) {
/* Database permissions. */
if (selector->flags & SELECTOR_FLAG_ALLDBS) {
- res = sdscatlen(res, "alldbs ", 7);
+ /* alldbs is default, avoid emitting it in ACL strings for compatibility. */
} else if (intsetLen(selector->dbs) == 0) {
res = sdscatlen(res, "resetdbs ", 9);
} else {
diff --git a/tests/unit/acl-v2.tcl b/tests/unit/acl-v2.tcl
index 2158e10324b..6822c68154d 100644
--- a/tests/unit/acl-v2.tcl
+++ b/tests/unit/acl-v2.tcl
@@ -347,6 +347,24 @@ start_server {tags {"acl external:skip"}} {
assert_equal "" [dict get $secondary_selector databases]
}
+ test {Test ACL LIST omits implicit alldbs} {
+ set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user default*"]]
+ assert_no_match "* alldbs *" $response
+
+ r ACL SETUSER user-list-alldbs on nopass -@all +get ~* &* alldbs
+ set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]]
+ assert_no_match "* alldbs *" $response
+
+ r ACL SETUSER user-list-alldbs db=0,1
+ set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]]
+ assert_match "* db=0,1 *" $response
+
+ r ACL SETUSER user-list-alldbs resetdbs
+ set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]]
+ assert_match "* resetdbs *" $response
+
+ r ACL DELUSER user-list-alldbs
+ }
test {Test ACL list idempotency} {
r ACL SETUSER user-idempotency off -@all +get resetchannels &channel1 %R~foo1 %W~bar1 ~baz1 (-@all +set resetchannels &channel2 %R~foo2 %W~bar2 ~baz2)
diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl
index c1b827c42b0..bf262447a35 100644
--- a/tests/unit/acl.tcl
+++ b/tests/unit/acl.tcl
@@ -179,7 +179,7 @@ start_server {tags {"acl external:skip"}} {
set curruser "hpuser"
foreach user [lshuffle $users] {
if {[string first $curruser $user] != -1} {
- assert_equal {user hpuser on nopass sanitize-payload resetchannels &foo alldbs +@all} $user
+ assert_equal {user hpuser on nopass sanitize-payload resetchannels &foo +@all} $user
}
}
@@ -1145,9 +1145,16 @@ start_server [list overrides [list "dir" $server_path "acl-pubsub-default" "allc
} {} {needs:debug}
test {ACL load and save} {
- r ACL setuser eve +get allkeys >eve on
+ r ACL setuser eve +get allkeys >eve on alldbs
r ACL save
+ # ACL SAVE uses the same serialization as ACL LIST,
+ # verify that ACL file omits the implicit alldbs rule.
+ set aclfile [file join \
+ [lindex [r CONFIG GET dir] 1] \
+ [lindex [r CONFIG GET aclfile] 1]]
+ assert_equal 0 [count_message_lines $aclfile alldbs]
+
r ACL load
# Clients should not be disconnected since permissions haven't changed
@@ -1357,6 +1364,12 @@ start_server {overrides {user "default on nopass ~* +@all -flushdb"} tags {acl e
test {ACL from config file and config rewrite} {
assert_error {NOPERM *} {r flushdb}
r config rewrite
+
+ # CONFIG REWRITE persists ACL users through the ACL string
+ # serializer, and should not have the implicit alldbs rule.
+ set config_file [srv 0 config_file]
+ assert_equal 0 [count_message_lines $config_file alldbs]
+
restart_server 0 true false
assert_error {NOPERM *} {r flushdb}
}
diff --git a/tests/unit/moduleapi/usercall.tcl b/tests/unit/moduleapi/usercall.tcl
index 80c9f883de8..29e5ff9797c 100644
--- a/tests/unit/moduleapi/usercall.tcl
+++ b/tests/unit/moduleapi/usercall.tcl
@@ -29,7 +29,7 @@ start_server {tags {"modules usercall network"}} {
assert_equal [r usercall.reset_user] OK
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
# off and sanitize-payload because module user / default value
- assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set"
+ assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set"
# doesn't fail for regular commands as just testing acl here
assert_equal [r usercall.$cmd {} set x 10] OK
@@ -48,7 +48,7 @@ start_server {tags {"modules usercall network"}} {
assert_equal [r usercall.reset_user] OK
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
# off and sanitize-payload because module user / default value
- assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set"
+ assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set"
# fails here as testing acl in rm call
assert_error {*NOPERM User module_user has no permissions*} {r usercall.$cmd C set x 10}
@@ -111,7 +111,7 @@ start_server {tags {"modules usercall network"}} {
assert_equal [r usercall.reset_user] OK
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
# off and sanitize-payload because module user / default value
- assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set"
+ assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set"
# passes as not checking ACL
assert_equal [r usercall.$cmd {} evalsha $sha_set 0] 1
From 2012b9f0c93bf5f2655a827f436f52a4a6147a45 Mon Sep 17 00:00:00 2001
From: Madelyn Olson
Date: Thu, 11 Jun 2026 14:01:08 -0700
Subject: [PATCH 03/15] Reject integer overflow of length fields in
zipmapValidateIntegrity (#3920)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
## Problem
A crafted zipmap entry can set the value length to a value near
`UINT32_MAX` so that the `l + e` sum (value length + one-byte free
space) wraps in `unsigned int` arithmetic. The wrapped sum advances the
validation cursor by a tiny amount, leaving `p` inside the buffer, so
the `OUT_OF_RANGE` check passes and `zipmapValidateIntegrity` wrongly
returns success. The field-length path has the same shape — advancing
`p` by a ~4GB length wraps the pointer on 32-bit builds.
`zipmapValidateIntegrity` is always called with `deep=1` from `rdb.c`
when loading `RDB_TYPE_HASH_ZIPMAP`, **including via `RESTORE`**, so any
client with `RESTORE` access can submit a payload that passes
validation. On 32-bit platforms this leads to out-of-bounds access
during the subsequent zipmap→listpack conversion. On 64-bit the
downstream `lpSafeToAdd` cap happens to reject it (the raw ~4GB length
exceeds `LISTPACK_MAX_SAFETY_SIZE`), but the validator should not accept
a malformed payload in the first place — this is the function whose sole
job is to reject it.
## Fix
Bounds-check the attacker-controlled length against the bytes remaining
in the zipmap, in 64-bit space, **before** any pointer arithmetic, for
both the field-length and value-length paths.
## Testing
- `tests/integration/corrupt-dump.tcl`: a `RESTORE`-path test exercising
the full attack surface; asserts rejection and that the server stays up.
- Verified the test **fails on the pre-fix code** (validator accepts the
value-length payload) and **passes after the fix**, confirmed by
stashing the fix during the integration run.
- Full `integration/corrupt-dump` suite: 76 passed, 0 failed.
> [!NOTE]
> Found via structure-aware fuzzing of the RESTORE path. This issue was
generated by AI but verified, with love, by a human.
Signed-off-by: Madelyn Olson
---
src/zipmap.c | 15 +++++++++++++--
tests/integration/corrupt-dump.tcl | 16 ++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/src/zipmap.c b/src/zipmap.c
index 5214eaa5782..f4e12409dfc 100644
--- a/src/zipmap.c
+++ b/src/zipmap.c
@@ -201,6 +201,12 @@ int zipmapValidateIntegrity(unsigned char *zm, size_t size, int deep) {
return 0;
p += s; /* skip the encoded field size */
+ /* Bounds-check the field length before advancing 'p'. Advancing the
+ * pointer first can wrap, since 'l' is an attacker-controlled 32-bit
+ * value, so the OUT_OF_RANGE check below could be bypassed. Compare
+ * against the bytes remaining in the zipmap in 64-bit space so the
+ * arithmetic cannot overflow on 32-bit platforms either (CWE-190). */
+ if ((uint64_t)l > (uint64_t)(zm + size - 1 - p)) return 0;
p += l; /* skip the field */
/* make sure the entry doesn't reach outside the edge of the zipmap */
@@ -216,8 +222,13 @@ int zipmapValidateIntegrity(unsigned char *zm, size_t size, int deep) {
/* Sanity check: length < 254 must be encoded in 1 byte, not 5 bytes */
if (l < ZIPMAP_BIGLEN && s != 1)
return 0;
- p += s; /* skip the encoded value size*/
- e = *p++; /* skip the encoded free space (always encoded in one byte) */
+ p += s; /* skip the encoded value size*/
+ e = *p++; /* skip the encoded free space (always encoded in one byte) */
+ /* Bounds-check 'l + e' before advancing 'p'. Both are unsigned int,
+ * so 'l + e' can wrap to a small value (e.g. l=0xFFFFFFFF, e=1) and
+ * slip past the OUT_OF_RANGE check below, defeating validation. Do
+ * the comparison in 64-bit space so it cannot overflow (CWE-190). */
+ if ((uint64_t)l + e > (uint64_t)(zm + size - 1 - p)) return 0;
p += l + e; /* skip the value and free space */
count++;
diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl
index 629b4ec3035..c4f03d3f4f0 100644
--- a/tests/integration/corrupt-dump.tcl
+++ b/tests/integration/corrupt-dump.tcl
@@ -39,6 +39,22 @@ test {corrupt payload: hash with valid zip list header, invalid entry len} {
}
}
+test {corrupt payload: zipmap value length integer overflow} {
+ # A zipmap value length of 0xFFFFFFFF with a free byte of 1 makes the
+ # internal 'l + e' sum wrap to 0 in 32-bit arithmetic. Before the fix the
+ # validator advanced its cursor by the wrapped value and wrongly accepted
+ # the payload, which leads to out-of-bounds access on 32-bit builds.
+ start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
+ r debug set-skip-checksum-validation 1
+ catch {
+ r restore key 0 "\x09\x0c\x01\x03\x66\x6f\x6f\xfe\xff\xff\xff\xff\x01\xff\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ } err
+ assert_match "*Bad data format*" $err
+ verify_log_message 0 "*Zipmap integrity check failed*" 0
+ assert_equal [r ping] "PONG"
+ }
+}
+
test {corrupt payload: invalid zlbytes header} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
catch {
From 215045eb95285248f0729e95d8d76a02bcf4f166 Mon Sep 17 00:00:00 2001
From: Madelyn Olson
Date: Thu, 11 Jun 2026 14:01:28 -0700
Subject: [PATCH 04/15] Reject NAN scores in listpack/ziplist-encoded sorted
sets on RDB load (#3921)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
## Problem
A crafted `RESTORE` payload can store a `NAN` score in a
listpack-encoded sorted set. The integrity validation
(`lpValidateIntegrityAndDups`) only checks the listpack *structure* and
member uniqueness — it does not check score validity — so the payload is
accepted on load.
When the sorted set is later converted to a skiplist (e.g. when it grows
past `zset-max-listpack-entries`, or via any operation that triggers
conversion), `zslInsertNode()` asserts the score is not `NAN`
(`t_zset.c:260`) and the server aborts. **Any client with `RESTORE`
access can remotely crash the server.**
The skiplist RDB format (`RDB_TYPE_ZSET` / `RDB_TYPE_ZSET_2`) already
rejects `NAN` scores at load time (`rdb.c`, "Zset with NAN score
detected"). The listpack format (`RDB_TYPE_ZSET_LISTPACK`) had no
equivalent check.
## Reproduction
```
RESTORE k 0 "\x11\x19\x19\x00\x00\x00\x04\x00\x82m1\x03\x83nan\x04\x82m2\x03\x832.5\x04\xFF\x50\x00...."
# loads OK, then:
ZADD k 9 x # forces listpack->skiplist conversion -> serverAssert(!isnan(node->score)) -> SIGABRT
```
## Fix
Add `zzlValidateScores()`, which scans the scores of a listpack zset
after structural validation and rejects the payload if any score is
`NAN`. Mirrors the existing skiplist-format check. `inf`/`-inf` and
large finite scores remain accepted (only `NAN` is rejected), matching
normal `ZADD` semantics.
## Testing
- `tests/integration/corrupt-dump.tcl`: a `RESTORE`-path test asserting
rejection and that the server stays up.
- Verified the test **fails on the pre-fix code** (server crashes on
conversion) and **passes after the fix**, by stashing the fix during the
run.
- Confirmed valid zsets, including `inf`/`-inf`/large finite scores,
still load and convert correctly.
- Full `integration/corrupt-dump` suite: 74 passed, 0 failed.
> [!NOTE]
> Found via structure-aware fuzzing of the RESTORE path. This issue was
generated by AI but verified, with love, by a human.
Signed-off-by: Madelyn Olson
---
src/rdb.c | 23 +++++++++++++++++++++++
src/server.h | 1 +
src/t_zset.c | 19 +++++++++++++++++++
tests/integration/corrupt-dump.tcl | 28 ++++++++++++++++++++++++++++
4 files changed, 71 insertions(+)
diff --git a/src/rdb.c b/src/rdb.c
index fd66147fc9b..aefb3707486 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -2503,6 +2503,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd
decrRefCount(o);
return NULL;
}
+ /* See the RDB_TYPE_ZSET_LISTPACK case: a NAN score would crash the
+ * server when the zset is converted to a skiplist. The legacy
+ * ziplist format is converted to a listpack above, so apply the
+ * same NAN check on the resulting listpack. */
+ if (!zzlValidateScores(lp)) {
+ rdbReportCorruptRDB("Zset ziplist with NAN score detected");
+ zfree(lp);
+ zfree(encoded);
+ objectSetVal(o, NULL);
+ decrRefCount(o);
+ return NULL;
+ }
zfree(objectGetVal(o));
o->type = OBJ_ZSET;
@@ -2528,6 +2540,17 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd
decrRefCount(o);
return NULL;
}
+ /* A NAN score would crash the server when the zset is converted to
+ * a skiplist (zslInsertNode asserts the score is not NAN). The
+ * skiplist RDB format rejects NAN scores at load time; do the same
+ * for the listpack format. */
+ if (!zzlValidateScores(encoded)) {
+ rdbReportCorruptRDB("Zset listpack with NAN score detected");
+ zfree(encoded);
+ objectSetVal(o, NULL);
+ decrRefCount(o);
+ return NULL;
+ }
o->type = OBJ_ZSET;
o->encoding = OBJ_ENCODING_LISTPACK;
if (zsetLength(o) == 0) {
diff --git a/src/server.h b/src/server.h
index 9ae9fd30be0..735434a4b7d 100644
--- a/src/server.h
+++ b/src/server.h
@@ -3422,6 +3422,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, const_sds ele);
zskiplistNode *zslNthInRange(zskiplist *zsl, zrangespec *range, long n, long *rank);
sds zslGetNodeElement(const zskiplistNode *x);
double zzlGetScore(unsigned char *sptr);
+int zzlValidateScores(unsigned char *zl);
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range);
diff --git a/src/t_zset.c b/src/t_zset.c
index b2c651b9345..a564aba7c70 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -866,6 +866,25 @@ double zzlGetScore(unsigned char *sptr) {
return score;
}
+/* Validate that none of the scores in a listpack-encoded sorted set is NAN.
+ * The structural layout (member, score, member, score, ...) must already have
+ * been validated by lpValidateIntegrityAndDups. Returns 1 if all scores are
+ * valid, 0 if a NAN score is found. This guards against crafted RESTORE
+ * payloads: zslInsertNode() asserts the score is not NAN, so a NAN score in a
+ * listpack zset would crash the server when it is later converted to a
+ * skiplist. The skiplist RDB format rejects NAN scores at load time; this is
+ * the equivalent check for the listpack format. */
+int zzlValidateScores(unsigned char *zl) {
+ unsigned char *eptr = lpSeek(zl, 0), *sptr;
+ while (eptr != NULL) {
+ sptr = lpNext(zl, eptr);
+ if (sptr == NULL) return 0; /* odd number of elements */
+ if (isnan(zzlGetScore(sptr))) return 0;
+ eptr = lpNext(zl, sptr);
+ }
+ return 1;
+}
+
/* Return a listpack element as an SDS string. */
sds lpGetObject(unsigned char *sptr) {
unsigned char *vstr;
diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl
index c4f03d3f4f0..e8a5c3c3f98 100644
--- a/tests/integration/corrupt-dump.tcl
+++ b/tests/integration/corrupt-dump.tcl
@@ -796,6 +796,34 @@ test {corrupt payload: fuzzer findings - zset zslInsert with a NAN score} {
}
}
+test {corrupt payload: zset listpack with NAN score} {
+ # A listpack-encoded sorted set whose score parses to NAN must be rejected
+ # on load. zslInsertNode() asserts the score is not NAN, so otherwise the
+ # server would crash when the zset is converted to a skiplist (e.g. when it
+ # grows past zset-max-listpack-entries). The skiplist RDB format already
+ # rejects NAN scores; this covers the listpack format.
+ start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
+ r debug set-skip-checksum-validation 1
+ catch {r restore _nan_zset_lp 0 "\x11\x19\x19\x00\x00\x00\x04\x00\x82\x6D\x31\x03\x83\x6E\x61\x6E\x04\x82\x6D\x32\x03\x83\x32\x2E\x35\x04\xFF\x50\x00\xC5\x5C\xC6\x0C\x7D\xFF\xB5\x52"} err
+ assert_match "*Bad data format*" $err
+ verify_log_message 0 "*NAN score*" 0
+ assert_equal [r ping] "PONG"
+ }
+}
+
+test {corrupt payload: zset ziplist with NAN score} {
+ # Same as the listpack case but for the legacy ziplist format, which is
+ # converted to a listpack on load. A NAN score must be rejected so it can
+ # not crash the server when the zset is later converted to a skiplist.
+ start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
+ r debug set-skip-checksum-validation 1
+ catch {r restore _nan_zset_zl 0 "\x0C\x1D\x1D\x00\x00\x00\x17\x00\x00\x00\x04\x00\x00\x02\x6D\x31\x04\x03\x6E\x61\x6E\x05\x02\x6D\x32\x04\x03\x32\x2E\x35\xFF\x0B\x00\x00\x00\x00\x00\x00\x00\x00\x00"} err
+ assert_match "*Bad data format*" $err
+ verify_log_message 0 "*NAN score*" 0
+ assert_equal [r ping] "PONG"
+ }
+}
+
test {corrupt payload: fuzzer findings - streamLastValidID panic} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload yes
From 2ad76292ac7ab82351f160df9782fb8bf60ea0d1 Mon Sep 17 00:00:00 2001
From: Binbin
Date: Fri, 12 Jun 2026 10:18:46 +0800
Subject: [PATCH 05/15] Stabilize CLUSTERSCAN unassigned-slot test by retrying
DELSLOTS (#3959)
The Case 3 portion of the test was flaky: after a single round of
`CLUSTER DELSLOTS 0` on R0/R1/R2, the cluster could stay in OK state
and `wait_for_cluster_state fail` would time out with
`Cluster node 1 cluster_state:ok`.
The race is between R0's local DELSLOTS and the gossip already in
flight from R0. After R1 locally clears slot 0, a stale pre-DELSLOTS
packet from R0 (whose myslots still claims slot 0) hits the
isSlotUnclaimed fast path in clusterUpdateSlotsConfigWith and rebinds
slot 0 back to R0 on R1. See:
```
if (isSlotUnclaimed(j) ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch ||
clusterSlotFailoverGranted(j)) {
...
clusterDelSlot(j);
clusterAddSlot(sender, j);
...
}
```
R0's subsequent "no longer claiming" PINGs cannot undo this, because
that path only sets owner_not_claiming_slot and never clears slots[j]:
```
if (server.cluster->slots[j] == sender) {
/* The slot is currently bound to the sender but the sender is no longer
* claiming it. We don't want to unbind the slot yet as it can cause the cluster
* to move to FAIL state and also throw client error. Keeping the slot bound to
* the previous owner will cause a few client side redirects, but won't throw
* any errors. We will keep track of the uncertainty in ownership to avoid
* propagating misinformation about this slot's ownership using UPDATE
* messages. */
bitmapSetBit(server.cluster->owner_not_claiming_slot, j);
}
```
Combined with clusterUpdateState's full-coverage check looking only
at slots[j] == NULL, R1 stays at cluster OK forever.
```
if (server.cluster->slots[j] == NULL || ...) {
new_state = CLUSTER_FAIL;
...
}
```
Rather than fighting the protocol's intentional asymmetry around
"soft delete" via gossip, just retry the DELSLOTS pass until all
three nodes converge to FAIL. This keeps the test focused on the
CLUSTERSCAN error semantics it actually wants to verify.
This closes #3891. The test was added in #3674.
Signed-off-by: Binbin
---
tests/unit/cluster/clusterscan.tcl | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/tests/unit/cluster/clusterscan.tcl b/tests/unit/cluster/clusterscan.tcl
index e65a5d5f6db..83a41fdf880 100644
--- a/tests/unit/cluster/clusterscan.tcl
+++ b/tests/unit/cluster/clusterscan.tcl
@@ -621,10 +621,20 @@ start_cluster 3 0 {tags {external:skip cluster}} {
# With full-coverage=yes the cluster enters FAIL state.
# Cursors for slot 0 should get "Hash slot not served".
# Cursors for assigned but remote slots should get "cluster is down".
- R 0 CLUSTER DELSLOTS 0
- catch {R 1 CLUSTER DELSLOTS 0}
- catch {R 2 CLUSTER DELSLOTS 0}
- wait_for_cluster_state fail
+ #
+ # Retry DELSLOTS in a loop: R0's old stale packet can rebind slot 0
+ # to R0 on R1/R2 and undoing the DELSLOTS. Loop until all nodes converge
+ # to FAIL with slot 0 unassigned.
+ wait_for_condition 1000 50 {
+ [catch {R 0 CLUSTER DELSLOTS 0}] >= 0 &&
+ [catch {R 1 CLUSTER DELSLOTS 0}] >= 0 &&
+ [catch {R 2 CLUSTER DELSLOTS 0}] >= 0 &&
+ [CI 0 cluster_state] eq "fail" &&
+ [CI 1 cluster_state] eq "fail" &&
+ [CI 2 cluster_state] eq "fail"
+ } else {
+ fail "Cluster did not converge to FAIL after DELSLOTS"
+ }
# Unassigned slot -> specific error.
assert_error {CLUSTERDOWN Hash slot not served} {R 0 clusterscan "0-{06S}-0"}
From ccce4f8d6f8bc71781bab72eab6087a4156fed3a Mon Sep 17 00:00:00 2001
From: Rick Ramsay <49293857+rickrams@users.noreply.github.com>
Date: Thu, 11 Jun 2026 19:48:27 -0700
Subject: [PATCH 06/15] Fix RESP3 type violation in addReplyCommandSubCommands
(#3939)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
## Summary
`addReplyCommandSubCommands` unconditionally called `addReplySetLen(c, 0)`
when a command has no subcommands, emitting a RESP3 Set type prefix (`~0`)
regardless of the `use_map` parameter. The non-empty path (below it) already
branches correctly on `use_map` — the empty early-return was simply missing
the same logic.
In RESP3, `COMMAND INFO ` returns the subcommands field as a Set (`~0`)
instead of an Array (`*0`) for any command without subcommands (e.g. PING).
Strict RESP3 client libraries that dispatch on collection type will
misinterpret the response. Not visible in RESP2 since both Set and Array use
the `*` prefix there.
## Fix
Apply the same `use_map` branch to the empty case:
- `addReplyMapLen(c, 0)` when `use_map=1`
- `addReplyArrayLen(c, 0)` otherwise
## Test
Added a `readraw` integration test in `tests/unit/introspection-2.tcl` that
inspects the raw wire-level type byte for the subcommands field of `COMMAND
INFO ping` in RESP3 mode, asserting `*0` (Array) rather than `~0` (Set).
Signed-off-by: Rick Ramsay <49293857+rickrams@users.noreply.github.com>
Signed-off-by: rickrams
---
src/server.c | 5 ++++-
tests/unit/introspection-2.tcl | 28 ++++++++++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/src/server.c b/src/server.c
index e018a6182ae..1d8fade177a 100644
--- a/src/server.c
+++ b/src/server.c
@@ -5321,7 +5321,10 @@ void addReplyCommandSubCommands(client *c,
void (*reply_function)(client *, struct serverCommand *),
int use_map) {
if (!cmd->subcommands_ht) {
- addReplySetLen(c, 0);
+ if (use_map)
+ addReplyMapLen(c, 0);
+ else
+ addReplyArrayLen(c, 0);
return;
}
diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl
index ef5a70ce8e1..bd8f912e9f4 100644
--- a/tests/unit/introspection-2.tcl
+++ b/tests/unit/introspection-2.tcl
@@ -265,6 +265,34 @@ start_server {tags {"introspection"}} {
assert_equal {{}} [r command info config|get|key]
}
+ test {COMMAND INFO subcommands field uses array not set type in RESP3 for commands with no subcommands} {
+ r hello 3
+ r readraw 1
+ r deferred 1
+ r command info ping
+ assert_equal [r read] {*1} ;# outer array: 1 command
+ assert_equal [r read] {*10} ;# command info: 10 fields
+ assert_equal [r read] {$4} ;# name (bulk string type)
+ assert_equal [r read] {ping} ;# name value
+ assert_equal [r read] {:-1} ;# arity
+ set flags_hdr [r read] ;# flags (~N set)
+ for {set i 0} {$i < [string range $flags_hdr 1 end]} {incr i} { r read }
+ assert_equal [r read] {:0} ;# first_key
+ assert_equal [r read] {:0} ;# last_key
+ assert_equal [r read] {:0} ;# step
+ set acl_hdr [r read] ;# acl_categories (~N set)
+ for {set i 0} {$i < [string range $acl_hdr 1 end]} {incr i} { r read }
+ set tips_hdr [r read] ;# tips (~N set)
+ set tips_count [string range $tips_hdr 1 end]
+ for {set i 0} {$i < $tips_count} {incr i} { r read ; r read }
+ assert_equal [r read] {~0} ;# key_specs: correctly a Set
+ set subcommands_hdr [r read] ;# subcommands: should be Array not Set
+ r readraw 0
+ r deferred 0
+ r hello 2
+ assert_equal {*0} $subcommands_hdr
+ } {} {resp3}
+
foreach cmd {SET GET MSET BITFIELD LMOVE LPOP BLPOP PING MEMORY MEMORY|USAGE RENAME GEORADIUS_RO} {
test "$cmd command will not be marked with movablekeys" {
set info [lindex [r command info $cmd] 0]
From 9ba9f6f884e581a25d68a8f11e07b93f8c3a78dd Mon Sep 17 00:00:00 2001
From: lovelypiska
Date: Fri, 12 Jun 2026 17:08:05 +0800
Subject: [PATCH 07/15] Fix off_t to int truncation in bio repl transfer size
reporting (#3811)
`off_t` (64-bit), but were read into `int` (32-bit) locals in
`genValkeyInfoString()` and `handleBioThreadFinishedRDBDownload()`.
This causes INFO replication to report negative
`master_sync_total_bytes` during bio disk-based sync when RDB exceeds
2GB.
Fix: change the local variable types from `int` to `off_t`.
Signed-off-by: chx9
---
src/replication.c | 4 ++--
src/server.c | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/src/replication.c b/src/replication.c
index 13cbc4183d6..d6a2e39c995 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -5220,8 +5220,8 @@ void handleBioThreadFinishedRDBDownload(void) {
debugServerAssert(bio_save_state == REPL_BIO_DISK_SAVE_STATE_FINISHED);
/* Bio termination detected - we can get rid of the state vars */
- int bio_repl_transfer_size = server.bio_repl_transfer_size;
- int bio_repl_transfer_read = server.bio_repl_transfer_read;
+ off_t bio_repl_transfer_size = server.bio_repl_transfer_size;
+ off_t bio_repl_transfer_read = server.bio_repl_transfer_read;
resetBioRDBSaveState();
serverLog(LL_NOTICE, "Replica main thread detected RDB download completion in Bio thread");
diff --git a/src/server.c b/src/server.c
index 1d8fade177a..8403c548cc1 100644
--- a/src/server.c
+++ b/src/server.c
@@ -6529,8 +6529,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"replicas_repl_buffer_peak:%zu\r\n", server.pending_repl_data.peak));
if (server.repl_state == REPL_STATE_TRANSFER) {
- int repl_transfer_size_stat;
- int repl_transfer_read_stat;
+ off_t repl_transfer_size_stat;
+ off_t repl_transfer_read_stat;
if (atomic_load_explicit(&server.replica_bio_disk_save_state, memory_order_acquire) != REPL_BIO_DISK_SAVE_STATE_NONE) {
repl_transfer_size_stat = server.bio_repl_transfer_size;
repl_transfer_read_stat = server.bio_repl_transfer_read;
From 84f723cc30e53528f7c1de4eb29dc4a1fc098076 Mon Sep 17 00:00:00 2001
From: pkhartsk
Date: Wed, 24 Jun 2026 14:10:15 +0200
Subject: [PATCH 08/15] Increase max proctitle length from 255 to 1024 (#3843)
255 is too short if valkey-server is being run from a long path,
especially if many fields are included such as in the "Process title set
as expected" test in `unit/other.tcl`, where the max length for the
cmdline is 1024, so having the same number in both makes sense.
Closes #3832.
Signed-off-by: Petr Khartskhaev
---
src/setproctitle.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/setproctitle.c b/src/setproctitle.c
index 952cf340ad4..bab01fb3c48 100644
--- a/src/setproctitle.c
+++ b/src/setproctitle.c
@@ -269,7 +269,7 @@ void spt_init(int argc, char *argv[]) {
#ifndef SPT_MAXTITLE
-#define SPT_MAXTITLE 255
+#define SPT_MAXTITLE 1024
#endif
void setproctitle(const char *fmt, ...) {
From 64c32d2641b34743744f730cf581f6405b6ff50a Mon Sep 17 00:00:00 2001
From: Sarthak Aggarwal
Date: Wed, 24 Jun 2026 09:30:10 -0700
Subject: [PATCH 09/15] Reduce io-threads modifiability test iterations under
Valgrind (#3980)
The `test io-threads are runtime modifiable` test in
`tests/unit/other.tcl` times out on the dedicated Valgrind jobs of the
daily CI, failing the run. The failing test is introduced in #3938.
This PR reduces the loop to 10 iterations under Valgrind.
**Failure links:**
- https://github.com/valkey-io/valkey/actions/runs/27386948127 (Jun 12)
- https://github.com/valkey-io/valkey/actions/runs/27315974006 (Jun 11)
- https://github.com/valkey-io/valkey/actions/runs/27245311034 (Jun 10)
---------
Signed-off-by: Sarthak Aggarwal
---
tests/support/server.tcl | 5 +++--
tests/test_helper.tcl | 20 ++++++++++++++++++--
tests/unit/other.tcl | 4 +++-
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index 832f3ca8654..65afff98907 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -364,8 +364,9 @@ proc spawn_server {executable config_file stdout stderr args} {
read stdin 1
}
- # Tell the test server about this new instance.
- send_data_packet $::test_server_fd server-spawned "$pid - $::curfile"
+ # Tell the test server about this new instance. Send the log path too so
+ # the orchestrator can dump it if the test times out.
+ send_data_packet $::test_server_fd server-spawned [list $pid $stdout $::curfile]
return $pid
}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index c70f5c6395a..d95a54c036b 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -78,6 +78,7 @@ set ::failures_output_file ""; # If set, write failures JSON to this path
set ::timeout 1200; # 20 minutes without progresses will quit the test.
set ::last_progress [clock seconds]
set ::active_servers {} ; # Pids of active server instances.
+array set ::server_logs {} ; # Maps server pid -> stdout log path.
set ::dont_clean 0
set ::dont_pre_clean 0
set ::wait_server 0
@@ -418,6 +419,7 @@ proc test_server_cron {} {
}
}
show_clients_state
+ dump_server_logs
force_kill_all_servers
kill_clients
the_end
@@ -509,13 +511,15 @@ proc read_from_test_client fd {
} elseif {$status eq {server-spawning}} {
set ::active_clients_task($fd) "(SPAWNING SERVER) $data"
} elseif {$status eq {server-spawned}} {
- set pid [string trim [lindex [split $data "-"] 0]]
+ set pid [string trim [lindex $data 0]]
+ set ::server_logs($pid) [lindex $data 1]
lappend ::active_servers $pid
- set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data"
+ set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$pid"
} elseif {$status eq {server-killing}} {
set ::active_clients_task($fd) "(KILLING SERVER) pid:$data"
} elseif {$status eq {server-killed}} {
set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data]
+ unset -nocomplain ::server_logs($data)
set ::active_clients_task($fd) "(KILLED SERVER) pid:$data"
} elseif {$status eq {run_solo}} {
lappend ::run_solo_tests $data
@@ -545,6 +549,18 @@ proc kill_clients {} {
}
}
+# Print the tail of each still-running server's log. Called on timeout so the
+# log of the stuck server is visible in CI output for troubleshooting.
+proc dump_server_logs {} {
+ foreach pid $::active_servers {
+ if {![info exists ::server_logs($pid)]} continue
+ set log $::server_logs($pid)
+ if {![file exists $log]} continue
+ puts "\n=== Server log (pid $pid): $log ==="
+ catch {puts [exec tail -n 100 $log]}
+ }
+}
+
proc force_kill_all_servers {} {
foreach p $::active_servers {
puts "Killing still running Valkey server $p"
diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl
index 46f4b1b7bbe..529b6ca4eca 100644
--- a/tests/unit/other.tcl
+++ b/tests/unit/other.tcl
@@ -585,8 +585,10 @@ start_server {tags {"other external:skip"}} {
start_server {tags {"other external:skip"}} {
test "test io-threads are runtime modifiable" {
+ # Each toggle spawns/joins real pthreads; too slow for 100 iterations under Valgrind.
+ set iterations [expr {$::valgrind ? 10 : 100}]
# Randomly set the number of threads between 1 and 5
- for {set i 0} {$i < 100} {incr i} {
+ for {set i 0} {$i < $iterations} {incr i} {
set random_num [expr {int(rand() * 5) + 1}]
r config set io-threads $random_num
set thread_num [lindex [r config get io-threads] 1]
From 9b4767b6eb81a0caf0b37c04d4b6c6cfe36a7bb7 Mon Sep 17 00:00:00 2001
From: lcxn123 <156999965+lcxn123@users.noreply.github.com>
Date: Thu, 25 Jun 2026 20:55:31 +0800
Subject: [PATCH 10/15] Fix HGETDEL FIELDS token validation (#4049)
Add explicit validation for the FIELDS token before parsing numfields.
Fixes #4045.
Signed-off-by: lcxn123 <156999965+lcxn123@users.noreply.github.com>
Co-authored-by: Binbin
---
src/t_hash.c | 5 +++++
tests/unit/type/hash.tcl | 2 +-
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/src/t_hash.c b/src/t_hash.c
index 821b43bb4ac..f7d197e1399 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -1162,6 +1162,11 @@ void hgetdelCommand(client *c) {
long long num_fields = 0;
bool keyremoved = false;
+ if (strcasecmp(objectGetVal(c->argv[fields_index - 2]), "fields")) {
+ addReplyErrorObject(c, shared.syntaxerr);
+ return;
+ }
+
if (getLongLongFromObjectOrReply(c, c->argv[fields_index - 1], &num_fields, NULL) != C_OK) return;
/* Check that the parsed fields number matches the real provided number of fields */
diff --git a/tests/unit/type/hash.tcl b/tests/unit/type/hash.tcl
index 1b519652048..a089d5bb9c7 100644
--- a/tests/unit/type/hash.tcl
+++ b/tests/unit/type/hash.tcl
@@ -512,7 +512,7 @@ start_server {tags {"hash"}} {
}
test {HGETDEL - check for syntax and type errors} {
- assert_error "*value is not an integer or out of range" {r hgetdel myhash a b c}
+ assert_error "*ERR syntax error" {r hgetdel myhash a 1 c}
assert_error "*value is not an integer or out of range" {r hgetdel myhash FIELDS a b c}
assert_error "*numfields should be greater than 0 and match the provided number of fields" {r hgetdel myhash FIELDS 2 a b c}
assert_error "*numfields should be greater than 0 and match the provided number of fields" {r hgetdel myhash FIELDS 4 a b c}
From d9c38d207f09aafb5225b407063edae189ea5993 Mon Sep 17 00:00:00 2001
From: pkhartsk
Date: Fri, 26 Jun 2026 23:47:22 +0200
Subject: [PATCH 11/15] Fix build warnings with OpenSSL 4.0 (#4016)
Some functions are newly deprecated in OpenSSL 4.0. This commit works
around those. More specifically:
- X509_cmp_current_time() in isCertValid() is no longer necessary as
X509_check_certificate_times() compares the notBefore and notAfter on
its own. Had to add a version check since this function is new in
OpenSSL 4.0.
- Replace deprecated X509_NAME_get_text_by_NID. Not a perfect fix,
because the new implementation still assumes that the name does not
contain embedded null characters which may not be true, e.g., if the
name is of type UniversalString or BMPString.
- Also fix constness of X509_get_subject_name return value.
The latter two fixes are taken from this Fedora patch:
https://src.fedoraproject.org/rpms/valkey/c/1a9c8847172ef3fb116a1e2fdb3871692378adae?branch=rawhide
Closes #4012
Signed-off-by: Petr Khartskhaev
---
src/tls.c | 27 +++++++++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
diff --git a/src/tls.c b/src/tls.c
index e0ec59d6fc7..6e8dda0d04d 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -1181,7 +1181,7 @@ static void updateSSLState(connection *conn_) {
}
static int getCertSubjectFieldByName(X509 *cert, const char *field, char *out, size_t outlen) {
- if (!cert || !field || !out) return 0;
+ if (!cert || !field || !out || outlen == 0) return 0;
int nid = -1;
@@ -1193,10 +1193,33 @@ static int getCertSubjectFieldByName(X509 *cert, const char *field, char *out, s
if (nid == -1) return 0;
+#if OPENSSL_VERSION_NUMBER >= 0x30000000L
+ const X509_NAME *subject = X509_get_subject_name(cert);
+#else
X509_NAME *subject = X509_get_subject_name(cert);
+#endif
if (!subject) return 0;
- return X509_NAME_get_text_by_NID(subject, nid, out, outlen) > 0;
+ /* X509_NAME_get_text_by_NID is deprecated in OpenSSL 4.0 */
+ int idx = X509_NAME_get_index_by_NID(subject, nid, -1);
+ if (idx < 0) return 0;
+
+ const X509_NAME_ENTRY *entry = X509_NAME_get_entry(subject, idx);
+ if (!entry) return 0;
+
+ const ASN1_STRING *data = X509_NAME_ENTRY_get_data(entry);
+ if (!data) return 0;
+
+ const unsigned char *str = ASN1_STRING_get0_data(data);
+ int len = ASN1_STRING_length(data);
+ if (!str || len <= 0) return 0;
+
+ /* Copy to output buffer, ensuring null termination */
+ size_t copy_len = (size_t)len < outlen - 1 ? (size_t)len : outlen - 1;
+ memcpy(out, str, copy_len);
+ out[copy_len] = '\0';
+
+ return 1;
}
/* Extract URI from Subject Alternative Name extension and return the first
From 64941bacb536fd48d71b6520b2af8c04a4c92764 Mon Sep 17 00:00:00 2001
From: cjx-zar <56825069+cjx-zar@users.noreply.github.com>
Date: Tue, 30 Jun 2026 10:14:43 +0800
Subject: [PATCH 12/15] Fix HRANDFIELD CASE 4 infinite loop when valid fields
fewer than count (#4047)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
### Problem
`HRANDFIELD` with a positive `count` can enter an infinite loop (CASE 4)
when the number of non-expired fields in a hash is less than `count`.
**Example:**
A hash has 100 fields total: 71 with short TTLs (already expired) and 29
persistent fields. A client runs:
```
HRANDFIELD myhash 30
```
Since `count * 3 = 90 < 100 = size`, CASE 4 is selected.
The loop calls `hashTypeRandomElement()`, which skips expired fields
(via `validateEntry`), so it always returns one of the 29 valid fields.
`hashTypeRandomElement()` retries up to 100 times internally to find a
non-expired field. However, as long as any valid fields exist, this
retry limit is rarely exhausted.
After collecting all 29 unique valid fields, every subsequent
`hashtableAdd()` returns false (duplicate), but `added = 29 < count =
30`, so the loop never breaks.
This is reproducible when active expiry hasn't cleaned up the expired
fields yet (e.g. on a replica, or during a burst of expirations).
### Fix
Add a `maxiter = count * 10` limit to the CASE 4 loop. When the
iteration budget is exhausted, return whatever unique fields have been
collected so far — returning fewer results is acceptable.
Signed-off-by: cjx-zar
Signed-off-by: Binbin
Co-authored-by: Binbin
---
src/t_hash.c | 3 ++-
tests/unit/hashexpire.tcl | 23 +++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/src/t_hash.c b/src/t_hash.c
index f7d197e1399..7144ff2330b 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -2315,10 +2315,11 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
else {
/* Hashtable encoding (generic implementation) */
unsigned long added = 0;
+ unsigned long maxtries = (count > ULONG_MAX / 10) ? ULONG_MAX : count * 10;
listpackEntry field, value;
hashtable *ht = hashtableCreate(&setHashtableType);
hashtableExpand(ht, count);
- while (added < count) {
+ while (added < count && maxtries--) {
/* In case we were unable to locate random element, it is probably because there is no such element
* since all elements are expired. */
if (hashTypeRandomElement(hash, size, &field, withvalues ? &value : NULL) != C_OK)
diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl
index 00b983cad7a..61f5a549efc 100644
--- a/tests/unit/hashexpire.tcl
+++ b/tests/unit/hashexpire.tcl
@@ -1540,6 +1540,29 @@ start_server {tags {"hashexpire"}} {
}
}
+ test "HRANDFIELD - CASE 4: does not loop forever when valid fields fewer than count" {
+ r FLUSHALL
+ r DEBUG SET-ACTIVE-EXPIRE 0
+
+ # 71 expired fields + 29 valid fields = 100 total
+ # count=30, count*3=90 < 100 -> CASE 4
+ for {set i 1} {$i <= 71} {incr i} {
+ r HSETEX myhash PX 1 FIELDS 1 f$i v$i
+ }
+ for {set i 72} {$i <= 100} {incr i} {
+ r HSET myhash f$i v$i
+ }
+
+ # Wait for fields to expire
+ after 100
+ assert_equal 100 [r HLEN myhash]
+
+ # Should return at most 29 valid fields without
+ assert_lessthan_equal [llength $result] 29
+
+ r DEBUG SET-ACTIVE-EXPIRE 1
+ } {OK} {needs:debug}
+
test "HRANDFIELD - returns null response when all fields are expired" {
r FLUSHALL
r DEBUG SET-ACTIVE-EXPIRE 0
From 083650b218593f5ffbb27063f899820df9af1840 Mon Sep 17 00:00:00 2001
From: Binbin
Date: Tue, 30 Jun 2026 10:22:04 +0800
Subject: [PATCH 13/15] Restore client's selected DB after module keyspace
notification (#4024)
moduleNotifyKeyspaceEvent() runs subscriber callbacks on the executing
client and calls selectDb(dbid) on it, but never restores the DB. When
dbid differs from the client's current DB (e.g. MOVE/COPY notify on the
destination DB), the client is left on the wrong DB, breaking subsequent
commands.
This was introduced by #1819, which started reusing the executing client
instead of a temporary one. Save the executing client's DB before the
subscriber loop and restore it afterwards. Covers both MOVE and COPY.
Signed-off-by: Binbin
---
src/module.c | 11 ++++++++
tests/unit/moduleapi/keyspace_events.tcl | 36 ++++++++++++++++++++++++
2 files changed, 47 insertions(+)
diff --git a/src/module.c b/src/module.c
index c716f942413..1ea966ee1d1 100644
--- a/src/module.c
+++ b/src/module.c
@@ -9494,6 +9494,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
/* Remove irrelevant flags from the type mask */
type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
+ /* When notifying via the executing client, the callbacks below select
+ * 'dbid' on its context. 'dbid' may differ from the client's currently
+ * selected DB (e.g. MOVE/COPY notify on the destination DB), so save the
+ * original DB and restore it afterwards to avoid leaving the client on the
+ * wrong DB for subsequent commands. */
+ client *executing_client = server.executing_client;
+ int origin_dbid = (executing_client != NULL) ? executing_client->db->id : -1;
+
while ((ln = listNext(&li))) {
ValkeyModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
@@ -9523,6 +9531,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
}
}
+ /* Restore the executing client's originally selected DB. */
+ if (executing_client != NULL) selectDb(executing_client, origin_dbid);
+
exitExecutionUnit();
}
diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl
index 9c1cfa8ba4d..604fe9981ec 100644
--- a/tests/unit/moduleapi/keyspace_events.tcl
+++ b/tests/unit/moduleapi/keyspace_events.tcl
@@ -94,6 +94,42 @@ tags "modules" {
assert_equal [r get testkeyspace:expired] 1
}
+ test {MOVE restores client DB after module keyspace notification} {
+ # MOVE fires move_from/move_to notifications on the source and
+ # destination DBs. The module subscribes to NOTIFY_GENERIC, and
+ # the bug left the client selected on the destination DB.
+ r flushall
+ r select 0
+ r set movekey value
+
+ # The client must stay on its original DB (c->db unchanged).
+ assert_equal 1 [r move movekey 10]
+ assert_match {*db=0*} [r client info]
+
+ # If the client was still on db10, this would fail with
+ # "source and destination objects are the same" instead of 0.
+ assert_equal 0 [r move movekey 10]
+ assert_match {*db=0*} [r client info]
+ } {} {singledb:skip}
+
+ test {COPY restores client DB after module keyspace notification} {
+ # COPY fires a copy_to notification on the destination DB. The
+ # module subscribes to NOTIFY_GENERIC, and the bug left the client
+ # selected on the destination DB.
+ r flushall
+ r select 0
+ r set copykey value
+
+ # The client must stay on its original DB (c->db unchanged).
+ assert_equal 1 [r copy copykey copykey DB 10]
+ assert_match {*db=0*} [r client info]
+
+ # If the client was still on db10, this would fail with
+ # "source and destination objects are the same" instead of 0
+ assert_equal 0 [r copy copykey copykey DB 10]
+ assert_match {*db=0*} [r client info]
+ } {} {singledb:skip}
+
test "Unload the module - testkeyspace" {
assert_equal {OK} [r module unload testkeyspace]
}
From 7b194db6b2b4e88f0ec8565d6eb27ccf49ed8a37 Mon Sep 17 00:00:00 2001
From: Saurabh K
Date: Tue, 30 Jun 2026 16:30:18 -0700
Subject: [PATCH 14/15] fix: Reject corrupt stream RDB with shared NACK across
consumers (#4073)
When loading a stream consumer group from RDB, the loader assigns each
NACK's consumer pointer without checking if it was already set. A
corrupt RDB payload can list the same message ID under multiple
consumers' PELs, resulting in a shared NACK. This leads to
use-after-free when one consumer is deleted while another still
references the same NACK.
Add a check that nack->consumer is NULL before assigning. If already
set, reject the RDB as corrupt.
Signed-off-by: Saurabh Kher
---
src/rdb.c | 6 ++++++
tests/unit/dump.tcl | 34 ++++++++++++++++++++++++++++++++++
2 files changed, 40 insertions(+)
diff --git a/src/rdb.c b/src/rdb.c
index aefb3707486..b2b78660d16 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -2881,6 +2881,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd
/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
+ if (nack->consumer && nack->consumer != consumer) {
+ rdbReportCorruptRDB("NACK already assigned to a different consumer, "
+ "shared NACKs across consumers are not valid");
+ decrRefCount(o);
+ return NULL;
+ }
nack->consumer = consumer;
if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) {
rdbReportCorruptRDB("Duplicated consumer PEL entry "
diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl
index a3b4cd13abe..e03e30c1fb8 100644
--- a/tests/unit/dump.tcl
+++ b/tests/unit/dump.tcl
@@ -470,4 +470,38 @@ start_server {tags {"dump"}} {
r debug set-skip-checksum-validation 0
assert_match {*Bad data format*} $err
} {} {needs:debug}
+
+ test {RESTORE rejects stream with shared NACK across consumers} {
+ # Create a valid stream with consumer group and two consumers
+ r DEL mystream
+ r XADD mystream 1-1 f v
+ r XADD mystream 2-1 f v
+ r XGROUP CREATE mystream grp 0
+ r XREADGROUP GROUP grp consumer1 COUNT 1 STREAMS mystream ">"
+ r XREADGROUP GROUP grp consumer2 COUNT 1 STREAMS mystream ">"
+
+ set dump [r DUMP mystream]
+ r DEL mystream
+
+ # In the dump, consumer2's PEL entry contains message ID 2-1.
+ # Replace it with 1-1 (same as consumer1's) to create a shared NACK.
+ # Message ID 2-1 in big-endian: \x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01
+ # Message ID 1-1 in big-endian: \x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01
+ # Find the last occurrence of ID 2-1 (in consumer2's PEL) and replace with 1-1
+ set id_2_1 "\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01"
+ set id_1_1 "\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01"
+
+ # Find the last occurrence (consumer2's PEL entry, not the global PEL)
+ set last_pos [string last $id_2_1 $dump]
+ if {$last_pos == -1} {
+ fail "Could not find message ID 2-1 in dump"
+ }
+ set corrupt_dump [string replace $dump $last_pos [expr {$last_pos + 15}] $id_1_1]
+
+ # Skip CRC validation since we modified the payload
+ r debug set-skip-checksum-validation 1
+ catch {r RESTORE mystream 0 $corrupt_dump} err
+ r debug set-skip-checksum-validation 0
+ assert_match {*Bad data format*} $err
+ } {} {needs:debug}
}
From cfc72d49c9e76fff3eb233f57c463e79f9d6844d Mon Sep 17 00:00:00 2001
From: Luke Palmer
Date: Wed, 1 Jul 2026 00:27:49 -0400
Subject: [PATCH 15/15] Fix sentinel failover coordinated segfault when old
leader's client is disconnected (#4068)
Prevent a segfault that can happen during a sentinel coordinated
failover (a new feature introduced in #1292) that can happen when
the coordinated failover attempting to clean up the former leader
races with that leader disconnecting.
Add a check for `link->cc` in `sentinelKillClients`. Otherwise the
sentinel node will segfault in this case.
Fixes #4066.
Signed-off-by: Luke Palmer
Signed-off-by: Binbin
Co-authored-by: Binbin
---
src/sentinel.c | 2 +
.../19-coordinated-failover-killed-link.tcl | 67 +++++++++++++++++++
2 files changed, 69 insertions(+)
create mode 100644 tests/sentinel/tests/19-coordinated-failover-killed-link.tcl
diff --git a/src/sentinel.c b/src/sentinel.c
index 4aa7ce8c885..5cd4ead6a5f 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -4809,6 +4809,8 @@ int sentinelFailoverTo(sentinelValkeyInstance *ri, const sentinelAddr *addr, mst
int sentinelKillClients(sentinelValkeyInstance *ri) {
int retval;
+ if (ri->link->cc == NULL) return C_ERR;
+
/* 1) Rewrite the configuration (the instance just switched roles)
* 2) Disconnect all clients (but this one sending the command) in order
* to trigger the ask-master-on-reconnection protocol for connected
diff --git a/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl b/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl
new file mode 100644
index 00000000000..1dd6f8a9240
--- /dev/null
+++ b/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl
@@ -0,0 +1,67 @@
+# Regression test for coordinated failover when the Sentinel command link to
+# the old primary is disconnected just as the promoted replica reports its new
+# role. In that window Sentinel must not dereference a NULL command link while
+# doing its best-effort client cleanup on the old primary.
+
+source "../tests/includes/init-tests.tcl"
+
+foreach_sentinel_id id {
+ S $id sentinel debug info-period 1000
+ S $id sentinel debug publish-period 100
+ S $id sentinel debug ping-period 100
+}
+
+proc sentinel_cmd_client_name {sentinel_id} {
+ set myid [S $sentinel_id SENTINEL MYID]
+ return "sentinel-[string range $myid 0 7]-cmd"
+}
+
+test "Coordinated failover tolerates old primary command link disconnect" {
+ set sentinel_id 0
+ set old_master_id $master_id
+ set old_port [RPort $old_master_id]
+ set old_addr [S $sentinel_id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster]
+ assert {[lindex $old_addr 1] == $old_port}
+
+ set sentinel_client_name [sentinel_cmd_client_name $sentinel_id]
+
+ wait_for_condition 1000 50 {
+ [string match "*name=$sentinel_client_name*" [R $old_master_id CLIENT LIST]]
+ } else {
+ fail "Sentinel command client $sentinel_client_name was not connected to the old primary"
+ }
+
+ wait_for_condition 300 50 {
+ [catch {S $sentinel_id SENTINEL FAILOVER mymaster COORDINATED}] == 0
+ } else {
+ catch {S $sentinel_id SENTINEL FAILOVER mymaster COORDINATED} reply
+ fail "Sentinel manual coordinated failover did not start, got: $reply"
+ }
+
+ wait_for_condition 1000 10 {
+ [dict get [S $sentinel_id SENTINEL PRIMARY mymaster] failover-state] eq {wait_promotion}
+ } else {
+ fail "Sentinel did not reach wait_promotion state"
+ }
+
+ # Keep the command connection from this Sentinel to the old primary closed
+ # while the next INFO reply from the promoted replica advances the failover.
+ # Before the fix, sentinelKillClients(old-primary) calls
+ # valkeyAsyncCommand(NULL, ...) in this window, and the Sentinel crashes.
+ S $sentinel_id sentinel debug ping-period 10000
+ catch {R $old_master_id CLIENT KILL NAME $sentinel_client_name}
+
+ wait_for_condition 1000 10 {
+ [string match "*disconnected*" [dict get [S $sentinel_id SENTINEL PRIMARY mymaster] flags]]
+ } else {
+ fail "Sentinel command link to the old primary did not disconnect"
+ }
+
+ assert_equal {PONG} [S $sentinel_id PING]
+
+ wait_for_condition 1000 50 {
+ [lindex [S $sentinel_id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster] 1] != $old_port
+ } else {
+ fail "Sentinel did not complete the coordinated failover"
+ }
+}