diff --git a/src/Makefile b/src/Makefile index 3b81517f4d4..2812c4dbad7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -582,7 +582,7 @@ ENGINE_SERVER_OBJ = \ ziplist.o \ zipmap.o \ zmalloc.o \ - queues.o + queues.o ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ) ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ = \ diff --git a/src/acl.c b/src/acl.c index 74ce448d334..e2765ba716b 100644 --- a/src/acl.c +++ b/src/acl.c @@ -869,7 +869,7 @@ static sds ACLDescribeSelector(aclSelector *selector) { /* Database permissions. */ if (selector->flags & SELECTOR_FLAG_ALLDBS) { - res = sdscatlen(res, "alldbs ", 7); + /* alldbs is default, avoid emitting it in ACL strings for compatibility. */ } else if (intsetLen(selector->dbs) == 0) { res = sdscatlen(res, "resetdbs ", 9); } else { diff --git a/src/io_threads.c b/src/io_threads.c index ca7d142fd9b..d6368f5524e 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -8,6 +8,10 @@ #include "queues.h" #include +#define IO_MPSC_QUEUE_SIZE 16384 +#define IO_SPMC_QUEUE_SIZE 4096 +#define IO_SPSC_QUEUE_SIZE 4096 + static _Thread_local int thread_id = 0; static _Thread_local mpscTicket io_thread_ticket = {0}; /* Backlog of responses when io_shared_outbox is full. Should be rare. */ @@ -44,8 +48,8 @@ static inline void untagJob(void *tagged_ptr, void **ptr, int *type) { /* Handler prototypes */ void ioThreadReadQueryFromClient(client *c); void ioThreadWriteToClient(client *c); -void IOThreadFreeArgv(robj **argv); -void IOThreadPoll(aeEventLoop *el); +void ioThreadFreeArgv(robj **argv); +void ioThreadPoll(aeEventLoop *el); static void ioThreadAccept(client *c); int inMainThread(void) { @@ -107,9 +111,6 @@ void waitForClientIO(client *c) { } void IOThreadsBeforeSleep(long long current_time) { -#ifndef RUSAGE_THREAD - UNUSED(current_time); -#endif if (server.io_threads_num == 1) return; serverAssert(inMainThread()); @@ -126,29 +127,23 @@ void IOThreadsBeforeSleep(long long current_time) { } } -#ifdef RUSAGE_THREAD - /* If threads are not active track main thread CPU time for ignition decision */ + /* If threads are not active, track main-thread active time for ignition decision */ if (server.active_io_threads_num == 1) { static long long last_measurement_time = 0; if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */ last_measurement_time = current_time; - struct rusage ru; - if (getrusage(RUSAGE_THREAD, &ru) == 0) { - long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec; - long long user_time_us = ru.ru_utime.tv_sec * 1000000LL + ru.ru_utime.tv_usec; - trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000); - trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER, user_time_us, current_time, 1000000); - } + trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, server.stat_active_time, current_time, 1000000); } -#endif } #define IO_COOLDOWN_MS 1000 #define IO_SAMPLE_RATE_MS 10 #define IO_IGNITION_EVENTS 4 -#define IO_IGNITION_CPU_SYS 30.0 -#define IO_IGNITION_CPU_SYS_LOW 5.0 -#define IO_IGNITION_CPU_USER 50.0 +/* Start using I/O threads when the main thread is active for more than the below + * defined percentage of the time. This number is picked somewhat arbitrarily but + * needed to be low enough to make sure we start the next thread quickly while not + * starting too many threads unnecessarily to avoid contention. */ +#define IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT 30 #define BATCH_SIZE 32 void IOThreadsAfterSleep(int numevents) { @@ -171,15 +166,9 @@ void IOThreadsAfterSleep(int numevents) { /* Ignition Policy */ if (server.active_io_threads_num == 1) { int should_ignite = 0; -#ifdef RUSAGE_THREAD - float cpu_sys = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS) / 10000.0; - float cpu_user = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER) / 10000.0; - /* Ignite IO threads if sys CPU > 30%, or if sys CPU > 5% and user CPU > 50% */ - should_ignite = (cpu_sys > IO_IGNITION_CPU_SYS) || - (cpu_sys > IO_IGNITION_CPU_SYS_LOW && cpu_user > IO_IGNITION_CPU_USER); -#else - should_ignite = (numevents >= IO_IGNITION_EVENTS); -#endif + float main_thread_active_time = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME) / 10000.0; + /* Ignite IO threads when main-thread active time exceeds the threshold (30%) */ + should_ignite = (main_thread_active_time > (float)IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT); if (should_ignite) { pthread_mutex_unlock(&io_threads_mutex[1]); server.active_io_threads_num++; @@ -243,7 +232,7 @@ void IOThreadsAfterSleep(int numevents) { /* This function performs polling on the given event loop and updates the server's * IO fired events count and poll state. */ -void IOThreadPoll(aeEventLoop *el) { +void ioThreadPoll(aeEventLoop *el) { struct timeval tvp = {0, 0}; int num_events = aePoll(el, &tvp); server.io_ae_fired_events = num_events; @@ -326,10 +315,10 @@ static void *IOThreadMain(void *myid) { switch (type) { case JOB_REQ_FREE_ARGV: - IOThreadFreeArgv((robj **)data); + ioThreadFreeArgv((robj **)data); break; case JOB_REQ_POLL: - IOThreadPoll((aeEventLoop *)data); + ioThreadPoll((aeEventLoop *)data); break; default: serverPanic("Invalid SPSC job type: %d", type); @@ -338,10 +327,8 @@ static void *IOThreadMain(void *myid) { processed += batch_count; } - /* - * PRIORITY 2: Shared Global Queue (SPMC) - * Only checked after SPSC is drained. - */ + /* PRIORITY 2: Shared Global Queue (SPMC) + * Only checked after SPSC is drained. */ void *tagged_job = spmcDequeue(&io_shared_inbox); if (tagged_job) { void *data; @@ -362,7 +349,7 @@ static void *IOThreadMain(void *myid) { ioThreadAccept((client *)data); break; case JOB_REQ_POLL: - IOThreadPoll((aeEventLoop *)data); + ioThreadPoll((aeEventLoop *)data); break; default: serverPanic("Invalid SPMC job type: %d", type); @@ -398,7 +385,7 @@ static void createIOThread(int id) { serverAssert(id > 0 && id < server.io_threads_num); /* Initialize the private SPSC queue for this thread */ - spscInit(&io_private_inbox[id]); + spscInit(&io_private_inbox[id], IO_SPSC_QUEUE_SIZE); pthread_t tid; pthread_mutex_init(&io_threads_mutex[id], NULL); @@ -458,7 +445,7 @@ int updateIOThreads(const char **err) { * in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */ size_t pending = getPendingIOResponsesCount(); - if (pending > MPSC_QUEUE_SIZE) { + if (pending > io_shared_outbox.queue_size) { if (err) *err = "Can't update IO threads under load, try again later"; return 0; } @@ -497,8 +484,8 @@ void initIOThreads(int prev_threads_num) { server.active_io_threads_num = 1; /* We start with threads not active. */ server.io_poll_state = AE_IO_STATE_NONE; server.io_ae_fired_events = 0; - spmcInit(&io_shared_inbox); - mpscInit(&io_shared_outbox); + spmcInit(&io_shared_inbox, IO_SPMC_QUEUE_SIZE); + mpscInit(&io_shared_outbox, IO_MPSC_QUEUE_SIZE); io_jobs_submitted = 0; atomic_init(&io_jobs_finished, 0); prefetchCommandsBatchInit(); @@ -560,6 +547,7 @@ int trySendWriteToIOThreads(client *c) { if (c->flag.lua_debug) return C_ERR; int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA; + clientReplyBlock *block = NULL; if (is_replica) { c->io_last_reply_block = listLast(server.repl_buffer_blocks); replBufBlock *o = listNodeValue(c->io_last_reply_block); @@ -571,14 +559,10 @@ int trySendWriteToIOThreads(client *c) { * threads from reading data that might be invalid in their local CPU cache. */ c->io_last_reply_block = listLast(c->reply); if (c->io_last_reply_block) { - clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block); + block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block); c->io_last_bufpos = block->used; - /* If buffer is encoded force new header */ - if (block->flag.buf_encoded) block->last_header = NULL; } else { c->io_last_bufpos = (size_t)c->bufpos; - /* If buffer is encoded force new header */ - if (c->flag.buf_encoded) c->last_header = NULL; } } @@ -597,7 +581,15 @@ int trySendWriteToIOThreads(client *c) { c->io_last_bufpos = 0; return C_ERR; } - + /* Force new header after successful enqueue so the main thread doesn't + * extend a header the I/O thread is currently reading. */ + if (!is_replica) { + if (block) { + if (block->flag.buf_encoded) block->last_header = NULL; + } else { + if (c->flag.buf_encoded) c->last_header = NULL; + } + } if (c->flag.pending_write) { listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); c->flag.pending_write = 0; @@ -609,7 +601,7 @@ int trySendWriteToIOThreads(client *c) { } /* Internal function to free the client's argv in an IO thread. */ -void IOThreadFreeArgv(robj **argv) { +void ioThreadFreeArgv(robj **argv) { int last_arg = 0; for (int i = 0;; i++) { robj *o = argv[i]; diff --git a/src/io_threads.h b/src/io_threads.h index 455079d4ade..4202f6508a2 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -12,16 +12,14 @@ typedef enum { JOB_REQ_ACCEPT, JOB_REQ_COUNT } JobRequest; -_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic"); +_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 8 for pointer arithmetic"); typedef enum { JOB_RES_READ_CLIENT = 0, JOB_RES_WRITE_CLIENT, JOB_RES_COUNT } JobResult; -_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic"); - -typedef void (*job_handler)(void *); +_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 8 for pointer arithmetic"); void initIOThreads(int prev_threads_num); void killIOThreads(void); diff --git a/src/module.c b/src/module.c index c716f942413..1ea966ee1d1 100644 --- a/src/module.c +++ b/src/module.c @@ -9494,6 +9494,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) /* Remove irrelevant flags from the type mask */ type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE); + /* When notifying via the executing client, the callbacks below select + * 'dbid' on its context. 'dbid' may differ from the client's currently + * selected DB (e.g. MOVE/COPY notify on the destination DB), so save the + * original DB and restore it afterwards to avoid leaving the client on the + * wrong DB for subsequent commands. */ + client *executing_client = server.executing_client; + int origin_dbid = (executing_client != NULL) ? executing_client->db->id : -1; + while ((ln = listNext(&li))) { ValkeyModuleKeyspaceSubscriber *sub = ln->value; /* Only notify subscribers on events matching the registration, @@ -9523,6 +9531,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) } } + /* Restore the executing client's originally selected DB. */ + if (executing_client != NULL) selectDb(executing_client, origin_dbid); + exitExecutionUnit(); } diff --git a/src/networking.c b/src/networking.c index c758abe4bff..ee24eb2e43c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -6532,31 +6532,16 @@ void evictClients(void) { size_t client_eviction_limit = getClientEvictionLimit(); if (client_eviction_limit == 0) return; - /* Variable to track memory of clients marked for close but not yet freed */ - size_t pending_freed = 0; - while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + - server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] - - pending_freed > + server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] > client_eviction_limit) { listNode *ln = listNext(&bucket_iter); if (ln) { client *c = ln->value; - if (c->flag.close_asap) { - /* Already scheduled to close. Count memory as freed and skip. */ - pending_freed += getClientMemoryUsage(c, NULL); - continue; - } sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log); serverLog(LL_NOTICE, "Evicting client: %s", ci); + if (freeClient(c)) server.stat_evictedclients++; sdsfree(ci); - server.stat_evictedclients++; - - if (freeClient(c) == 0) { - /* Protected client (async close). Count memory as freed and skip. */ - pending_freed += getClientMemoryUsage(c, NULL); - continue; - } } else { curr_bucket--; if (curr_bucket < 0) { diff --git a/src/queues.c b/src/queues.c index d2146d5d9a9..2bb5605d9bb 100644 --- a/src/queues.c +++ b/src/queues.c @@ -4,24 +4,27 @@ * SPDX-License-Identifier: BSD-3-Clause * * Implementation of MPSC, SPMC, and SPSC queues - * */ #include "queues.h" #include "zmalloc.h" -inline void mpscInit(mpscQueue *q) { - q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * MPSC_QUEUE_SIZE); + +void mpscInit(mpscQueue *q, size_t queue_size) { + /* Queue size must be a power of 2 (the masking logic relies on it) */ + assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0); + q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * queue_size); + q->queue_size = queue_size; atomic_init(&q->head, 0); atomic_init(&q->tail, 0); atomic_init(&q->head_cache, 0); q->tail_cache = 0; - for (size_t i = 0; i < MPSC_QUEUE_SIZE; ++i) { + for (size_t i = 0; i < q->queue_size; ++i) { atomic_init(&q->buffer[i], NULL); } } -inline void mpscFree(mpscQueue *q) { +void mpscFree(mpscQueue *q) { if (q->buffer) { zfree(q->buffer); q->buffer = NULL; @@ -32,7 +35,7 @@ inline void mpscFree(mpscQueue *q) { q->tail_cache = 0; } -inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) { +bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) { size_t tail; assert(data); @@ -45,12 +48,12 @@ inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) { /* Check limits (Fullness check) */ size_t head = atomic_load_explicit(&q->head_cache, memory_order_acquire); - if ((tail - head) >= MPSC_QUEUE_SIZE) { + if ((tail - head) >= q->queue_size) { /* Cached limit reached, refresh from actual head */ head = atomic_load_explicit(&q->head, memory_order_acquire); atomic_store_explicit(&q->head_cache, head, memory_order_release); - if (unlikely((tail - head) >= MPSC_QUEUE_SIZE)) { + if (unlikely((tail - head) >= q->queue_size)) { /* Queue is full - Persist reservation for retry */ ticket->index = tail; ticket->has_reservation = true; @@ -59,13 +62,13 @@ inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) { } /* Commit data */ - atomic_store_explicit(&q->buffer[tail & MPSC_QUEUE_MASK], data, memory_order_release); + atomic_store_explicit(&q->buffer[tail & (q->queue_size - 1)], data, memory_order_release); ticket->has_reservation = false; return true; } -inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) { +size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) { size_t popped_count = 0; size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); size_t tail = q->tail_cache; @@ -81,13 +84,13 @@ inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) { if (limit > max_jobs) limit = max_jobs; for (size_t i = 0; i < limit; ++i) { - void *data = atomic_load_explicit(&q->buffer[head & MPSC_QUEUE_MASK], memory_order_relaxed); + void *data = atomic_load_explicit(&q->buffer[head & (q->queue_size - 1)], memory_order_relaxed); /* Stop if slot is reserved but data not yet written */ if (!data) break; jobs_out[popped_count++] = data; - atomic_store_explicit(&q->buffer[head & MPSC_QUEUE_MASK], NULL, memory_order_relaxed); + atomic_store_explicit(&q->buffer[head & (q->queue_size - 1)], NULL, memory_order_relaxed); head++; } @@ -103,19 +106,22 @@ inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) { * SPMC QUEUE (Single-Producer Multi-Consumer) * ========================================================================== */ -inline void spmcInit(spmcQueue *q) { - q->buffer = (spmcCell *)zmalloc_cache_aligned(sizeof(spmcCell) * SPMC_QUEUE_SIZE); +inline void spmcInit(spmcQueue *q, size_t queue_size) { + /* Queue size must be a power of 2 (the masking logic relies on it) */ + assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0); + q->buffer = (spmcCell *)zmalloc_cache_aligned(sizeof(spmcCell) * queue_size); + q->queue_size = queue_size; atomic_init(&q->head, 0); q->tail = 0; q->head_cache = 0; - for (size_t i = 0; i < SPMC_QUEUE_SIZE; i++) { + for (size_t i = 0; i < q->queue_size; i++) { atomic_init(&q->buffer[i].sequence, i); q->buffer[i].data = NULL; } } -inline void spmcFree(spmcQueue *q) { +void spmcFree(spmcQueue *q) { if (q->buffer) { zfree(q->buffer); q->buffer = NULL; @@ -125,7 +131,7 @@ inline void spmcFree(spmcQueue *q) { q->head_cache = 0; } -inline bool spmcIsEmpty(spmcQueue *q) { +bool spmcIsEmpty(spmcQueue *q) { /* Fast path: Check against cached consumer position */ if (q->tail == q->head_cache) { return true; @@ -138,13 +144,13 @@ inline bool spmcIsEmpty(spmcQueue *q) { return q->tail == curr_head; } -inline size_t spmcSize(spmcQueue *q) { +size_t spmcSize(spmcQueue *q) { size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); return (q->tail >= head) ? (q->tail - head) : 0; } -inline bool spmcEnqueue(spmcQueue *q, void *data) { - spmcCell *cell = &q->buffer[q->tail & SPMC_QUEUE_MASK]; +bool spmcEnqueue(spmcQueue *q, void *data) { + spmcCell *cell = &q->buffer[q->tail & (q->queue_size - 1)]; size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); /* Sequence Check: @@ -163,13 +169,13 @@ inline bool spmcEnqueue(spmcQueue *q, void *data) { return true; } -inline void *spmcDequeue(spmcQueue *q) { +void *spmcDequeue(spmcQueue *q) { size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); spmcCell *cell; void *data; while (1) { - cell = &q->buffer[head & SPMC_QUEUE_MASK]; + cell = &q->buffer[head & (q->queue_size - 1)]; size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1); @@ -182,7 +188,7 @@ inline void *spmcDequeue(spmcQueue *q) { data = cell->data; /* Mark slot empty for next generation (pos + size) */ - atomic_store_explicit(&cell->sequence, head + SPMC_QUEUE_SIZE, memory_order_release); + atomic_store_explicit(&cell->sequence, head + q->queue_size, memory_order_release); return data; } } else if (diff < 0) { @@ -199,8 +205,11 @@ inline void *spmcDequeue(spmcQueue *q) { * SPSC QUEUE (Single-Producer Single-Consumer) * ========================================================================== */ -inline void spscInit(spscQueue *q) { - q->buffer = (void **)zmalloc(sizeof(void *) * SPSC_QUEUE_SIZE); +void spscInit(spscQueue *q, size_t queue_size) { + /* Queue size must be a power of 2 (the masking logic relies on it) */ + assert(queue_size > 0 && (queue_size & (queue_size - 1)) == 0); + q->buffer = (void **)zmalloc(sizeof(void *) * queue_size); + q->queue_size = queue_size; atomic_init(&q->head, 0); atomic_init(&q->tail, 0); q->head_cache = 0; @@ -208,7 +217,7 @@ inline void spscInit(spscQueue *q) { q->tail_local = 0; } -inline void spscFree(spscQueue *q) { +void spscFree(spscQueue *q) { if (q->buffer) { zfree(q->buffer); q->buffer = NULL; @@ -220,13 +229,13 @@ inline void spscFree(spscQueue *q) { q->tail_local = 0; } -inline bool spscIsFull(spscQueue *q) { +bool spscIsFull(spscQueue *q) { const size_t curr_tail = q->tail_local; - if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) { + if (curr_tail - q->head_cache >= q->queue_size) { q->head_cache = atomic_load_explicit(&q->head, memory_order_acquire); - if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) { + if (curr_tail - q->head_cache >= q->queue_size) { /* Flush any local changes before reporting full */ if (q->tail_local != q->tail) { atomic_store_explicit(&q->tail, q->tail_local, memory_order_release); @@ -237,8 +246,8 @@ inline bool spscIsFull(spscQueue *q) { return false; } -inline void spscEnqueue(spscQueue *q, void *data, bool commit) { - q->buffer[q->tail_local & SPSC_QUEUE_MASK] = data; +void spscEnqueue(spscQueue *q, void *data, bool commit) { + q->buffer[q->tail_local & (q->queue_size - 1)] = data; q->tail_local++; if (commit) { @@ -246,13 +255,13 @@ inline void spscEnqueue(spscQueue *q, void *data, bool commit) { } } -inline void spscCommit(spscQueue *q) { +void spscCommit(spscQueue *q) { size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); if (q->tail_local == tail) return; atomic_store_explicit(&q->tail, q->tail_local, memory_order_release); } -inline size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) { +size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) { size_t curr_head = atomic_load_explicit(&q->head, memory_order_relaxed); size_t curr_tail_cache = q->tail_cache; @@ -266,13 +275,13 @@ inline size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) { size_t count = (num_jobs < available) ? num_jobs : available; for (size_t i = 0; i < count; ++i) { - jobs_out[i] = q->buffer[(curr_head + i) & SPSC_QUEUE_MASK]; + jobs_out[i] = q->buffer[(curr_head + i) & (q->queue_size - 1)]; } atomic_store_explicit(&q->head, curr_head + count, memory_order_release); return count; } -inline bool spscIsEmpty(spscQueue *q) { +bool spscIsEmpty(spscQueue *q) { /* Fast path */ if (q->tail_local == q->head_cache) { return true; diff --git a/src/queues.h b/src/queues.h index 4b6e4bc53e6..8c10649473a 100644 --- a/src/queues.h +++ b/src/queues.h @@ -37,12 +37,6 @@ * MPSC QUEUE (Multi-Producer Single-Consumer) * ========================================================================== */ -#define MPSC_QUEUE_SIZE 16384 -#define MPSC_QUEUE_MASK (MPSC_QUEUE_SIZE - 1) -#ifndef __cplusplus -static_assert((MPSC_QUEUE_SIZE & (MPSC_QUEUE_SIZE - 1)) == 0, "MPSC_QUEUE_SIZE must be power of 2"); -#endif - typedef struct mpscTicket { size_t index; bool has_reservation; @@ -59,9 +53,12 @@ typedef struct mpscQueue { /* Data buffer */ _Alignas(CACHE_LINE_SIZE) _Atomic(void *) *buffer; + size_t queue_size; } mpscQueue; -void mpscInit(mpscQueue *q); +/* Initializes an MPSC queue with a size that must be a power of 2 */ +void mpscInit(mpscQueue *q, size_t queue_size); +/* Frees the MPSC queue's internal buffer and resets its state */ void mpscFree(mpscQueue *q); /* Pushes an item into the queue and returns true if the queue is not full. @@ -77,12 +74,6 @@ size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs); * SPMC QUEUE (Single-Producer Multi-Consumer) * ========================================================================== */ -#define SPMC_QUEUE_SIZE 4096 -#define SPMC_QUEUE_MASK (SPMC_QUEUE_SIZE - 1) -#ifndef __cplusplus -static_assert((SPMC_QUEUE_SIZE & (SPMC_QUEUE_SIZE - 1)) == 0, "SPMC_QUEUE_SIZE must be power of 2"); -#endif - typedef struct spmcCell { _Alignas(CACHE_LINE_SIZE) _Atomic(size_t) sequence; void *data; @@ -98,25 +89,26 @@ typedef struct spmcQueue { /* Data buffer */ _Alignas(CACHE_LINE_SIZE) spmcCell *buffer; + size_t queue_size; } spmcQueue; -void spmcInit(spmcQueue *q); +/* Initializes an SPMC queue with a size that must be a power of 2 */ +void spmcInit(spmcQueue *q, size_t queue_size); +/* Frees the SPMC queue's internal buffer and resets its state */ void spmcFree(spmcQueue *q); +/* Returns true if the SPMC queue has no items */ bool spmcIsEmpty(spmcQueue *q); +/* Returns an approximate number of items currently in the queue */ size_t spmcSize(spmcQueue *q); +/* Pushes an item to the SPMC queue. Returns true on success, false if the queue is full. */ bool spmcEnqueue(spmcQueue *q, void *data); +/* Pops and returns the next item from the queue, or NULL if the queue is empty */ void *spmcDequeue(spmcQueue *q); /* ========================================================================== * SPSC QUEUE (Single-Producer Single-Consumer) * ========================================================================== */ -#define SPSC_QUEUE_SIZE 4096 -#define SPSC_QUEUE_MASK (SPSC_QUEUE_SIZE - 1) -#ifndef __cplusplus -static_assert((SPSC_QUEUE_SIZE & (SPSC_QUEUE_SIZE - 1)) == 0, "SPSC_QUEUE_SIZE must be power of 2"); -#endif - typedef struct spscQueue { /* Consumer cache line */ _Alignas(CACHE_LINE_SIZE) _Atomic(size_t) head; @@ -129,16 +121,22 @@ typedef struct spscQueue { /* Dynamic buffer */ _Alignas(CACHE_LINE_SIZE) void **buffer; + size_t queue_size; } spscQueue; -void spscInit(spscQueue *q); +/* Initializes an SPSC queue with a size that must be a power of 2 */ +void spscInit(spscQueue *q, size_t queue_size); +/* Frees the SPSC queue's internal buffer and resets its state */ void spscFree(spscQueue *q); +/* Returns true if the queue is full, or false otherwise */ bool spscIsFull(spscQueue *q); /* Push data to the queue. Caller must ensure queue is not full via spscIsFull(). * If commit is true, the tail pointer is updated immediately (visible to consumer) else, * only local index is updated (batching). */ void spscEnqueue(spscQueue *q, void *data, bool commit); +/* Publishes any pending batched enqueues by advancing the shared tail pointer */ void spscCommit(spscQueue *q); +/* Pops up to num_jobs items from the queue and returns the actual number popped */ size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs); /* Check if queue is empty from producer's perspective. */ bool spscIsEmpty(spscQueue *q); diff --git a/src/rdb.c b/src/rdb.c index fd66147fc9b..b2b78660d16 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2503,6 +2503,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd decrRefCount(o); return NULL; } + /* See the RDB_TYPE_ZSET_LISTPACK case: a NAN score would crash the + * server when the zset is converted to a skiplist. The legacy + * ziplist format is converted to a listpack above, so apply the + * same NAN check on the resulting listpack. */ + if (!zzlValidateScores(lp)) { + rdbReportCorruptRDB("Zset ziplist with NAN score detected"); + zfree(lp); + zfree(encoded); + objectSetVal(o, NULL); + decrRefCount(o); + return NULL; + } zfree(objectGetVal(o)); o->type = OBJ_ZSET; @@ -2528,6 +2540,17 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd decrRefCount(o); return NULL; } + /* A NAN score would crash the server when the zset is converted to + * a skiplist (zslInsertNode asserts the score is not NAN). The + * skiplist RDB format rejects NAN scores at load time; do the same + * for the listpack format. */ + if (!zzlValidateScores(encoded)) { + rdbReportCorruptRDB("Zset listpack with NAN score detected"); + zfree(encoded); + objectSetVal(o, NULL); + decrRefCount(o); + return NULL; + } o->type = OBJ_ZSET; o->encoding = OBJ_ENCODING_LISTPACK; if (zsetLength(o) == 0) { @@ -2858,6 +2881,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error, int rd /* Set the NACK consumer, that was left to NULL when * loading the global PEL. Then set the same shared * NACK structure also in the consumer-specific PEL. */ + if (nack->consumer && nack->consumer != consumer) { + rdbReportCorruptRDB("NACK already assigned to a different consumer, " + "shared NACKs across consumers are not valid"); + decrRefCount(o); + return NULL; + } nack->consumer = consumer; if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) { rdbReportCorruptRDB("Duplicated consumer PEL entry " diff --git a/src/replication.c b/src/replication.c index 13cbc4183d6..d6a2e39c995 100644 --- a/src/replication.c +++ b/src/replication.c @@ -5220,8 +5220,8 @@ void handleBioThreadFinishedRDBDownload(void) { debugServerAssert(bio_save_state == REPL_BIO_DISK_SAVE_STATE_FINISHED); /* Bio termination detected - we can get rid of the state vars */ - int bio_repl_transfer_size = server.bio_repl_transfer_size; - int bio_repl_transfer_read = server.bio_repl_transfer_read; + off_t bio_repl_transfer_size = server.bio_repl_transfer_size; + off_t bio_repl_transfer_read = server.bio_repl_transfer_read; resetBioRDBSaveState(); serverLog(LL_NOTICE, "Replica main thread detected RDB download completion in Bio thread"); diff --git a/src/sentinel.c b/src/sentinel.c index 4aa7ce8c885..5cd4ead6a5f 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -4809,6 +4809,8 @@ int sentinelFailoverTo(sentinelValkeyInstance *ri, const sentinelAddr *addr, mst int sentinelKillClients(sentinelValkeyInstance *ri) { int retval; + if (ri->link->cc == NULL) return C_ERR; + /* 1) Rewrite the configuration (the instance just switched roles) * 2) Disconnect all clients (but this one sending the command) in order * to trigger the ask-master-on-reconnection protocol for connected diff --git a/src/server.c b/src/server.c index e018a6182ae..8403c548cc1 100644 --- a/src/server.c +++ b/src/server.c @@ -5321,7 +5321,10 @@ void addReplyCommandSubCommands(client *c, void (*reply_function)(client *, struct serverCommand *), int use_map) { if (!cmd->subcommands_ht) { - addReplySetLen(c, 0); + if (use_map) + addReplyMapLen(c, 0); + else + addReplyArrayLen(c, 0); return; } @@ -6526,8 +6529,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "replicas_repl_buffer_peak:%zu\r\n", server.pending_repl_data.peak)); if (server.repl_state == REPL_STATE_TRANSFER) { - int repl_transfer_size_stat; - int repl_transfer_read_stat; + off_t repl_transfer_size_stat; + off_t repl_transfer_read_stat; if (atomic_load_explicit(&server.replica_bio_disk_save_state, memory_order_acquire) != REPL_BIO_DISK_SAVE_STATE_NONE) { repl_transfer_size_stat = server.bio_repl_transfer_size; repl_transfer_read_stat = server.bio_repl_transfer_read; diff --git a/src/server.h b/src/server.h index fa6ebf3a8a3..735434a4b7d 100644 --- a/src/server.h +++ b/src/server.h @@ -195,17 +195,16 @@ struct ValkeyModule; /* Instantaneous metrics tracking. */ #define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */ typedef enum { - STATS_METRIC_COMMAND = 0, /* Number of commands executed. */ - STATS_METRIC_NET_INPUT, /* Bytes read from network. */ - STATS_METRIC_NET_OUTPUT, /* Bytes written to network. */ - STATS_METRIC_NET_INPUT_REPLICATION, /* Bytes read from network during replication. */ - STATS_METRIC_NET_OUTPUT_REPLICATION, /* Bytes written to network during replication. */ - STATS_METRIC_EL_CYCLE, /* Number of eventloop cycled. */ - STATS_METRIC_EL_DURATION, /* Eventloop duration. */ - STATS_METRIC_IO_WAIT, /* IO queue size */ - STATS_METRIC_MAIN_THREAD_CPU_SYS, /* Main thread CPU sys time */ - STATS_METRIC_MAIN_THREAD_CPU_USER, /* Main thread CPU user time */ - STATS_METRIC_COUNT /* Total count */ + STATS_METRIC_COMMAND = 0, /* Number of commands executed. */ + STATS_METRIC_NET_INPUT, /* Bytes read from network. */ + STATS_METRIC_NET_OUTPUT, /* Bytes written to network. */ + STATS_METRIC_NET_INPUT_REPLICATION, /* Bytes read from network during replication. */ + STATS_METRIC_NET_OUTPUT_REPLICATION, /* Bytes written to network during replication. */ + STATS_METRIC_EL_CYCLE, /* Number of eventloop cycled. */ + STATS_METRIC_EL_DURATION, /* Eventloop duration. */ + STATS_METRIC_IO_WAIT, /* IO queue size */ + STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, /* Main-thread active time */ + STATS_METRIC_COUNT /* Total count */ } instantaneous_metric_type; /* Protocol and I/O related defines */ @@ -3423,6 +3422,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, const_sds ele); zskiplistNode *zslNthInRange(zskiplist *zsl, zrangespec *range, long n, long *rank); sds zslGetNodeElement(const zskiplistNode *x); double zzlGetScore(unsigned char *sptr); +int zzlValidateScores(unsigned char *zl); void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range); diff --git a/src/setproctitle.c b/src/setproctitle.c index 952cf340ad4..bab01fb3c48 100644 --- a/src/setproctitle.c +++ b/src/setproctitle.c @@ -269,7 +269,7 @@ void spt_init(int argc, char *argv[]) { #ifndef SPT_MAXTITLE -#define SPT_MAXTITLE 255 +#define SPT_MAXTITLE 1024 #endif void setproctitle(const char *fmt, ...) { diff --git a/src/t_hash.c b/src/t_hash.c index 821b43bb4ac..7144ff2330b 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -1162,6 +1162,11 @@ void hgetdelCommand(client *c) { long long num_fields = 0; bool keyremoved = false; + if (strcasecmp(objectGetVal(c->argv[fields_index - 2]), "fields")) { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + if (getLongLongFromObjectOrReply(c, c->argv[fields_index - 1], &num_fields, NULL) != C_OK) return; /* Check that the parsed fields number matches the real provided number of fields */ @@ -2310,10 +2315,11 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { else { /* Hashtable encoding (generic implementation) */ unsigned long added = 0; + unsigned long maxtries = (count > ULONG_MAX / 10) ? ULONG_MAX : count * 10; listpackEntry field, value; hashtable *ht = hashtableCreate(&setHashtableType); hashtableExpand(ht, count); - while (added < count) { + while (added < count && maxtries--) { /* In case we were unable to locate random element, it is probably because there is no such element * since all elements are expired. */ if (hashTypeRandomElement(hash, size, &field, withvalues ? &value : NULL) != C_OK) diff --git a/src/t_zset.c b/src/t_zset.c index b2c651b9345..a564aba7c70 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -866,6 +866,25 @@ double zzlGetScore(unsigned char *sptr) { return score; } +/* Validate that none of the scores in a listpack-encoded sorted set is NAN. + * The structural layout (member, score, member, score, ...) must already have + * been validated by lpValidateIntegrityAndDups. Returns 1 if all scores are + * valid, 0 if a NAN score is found. This guards against crafted RESTORE + * payloads: zslInsertNode() asserts the score is not NAN, so a NAN score in a + * listpack zset would crash the server when it is later converted to a + * skiplist. The skiplist RDB format rejects NAN scores at load time; this is + * the equivalent check for the listpack format. */ +int zzlValidateScores(unsigned char *zl) { + unsigned char *eptr = lpSeek(zl, 0), *sptr; + while (eptr != NULL) { + sptr = lpNext(zl, eptr); + if (sptr == NULL) return 0; /* odd number of elements */ + if (isnan(zzlGetScore(sptr))) return 0; + eptr = lpNext(zl, sptr); + } + return 1; +} + /* Return a listpack element as an SDS string. */ sds lpGetObject(unsigned char *sptr) { unsigned char *vstr; diff --git a/src/tls.c b/src/tls.c index e0ec59d6fc7..6e8dda0d04d 100644 --- a/src/tls.c +++ b/src/tls.c @@ -1181,7 +1181,7 @@ static void updateSSLState(connection *conn_) { } static int getCertSubjectFieldByName(X509 *cert, const char *field, char *out, size_t outlen) { - if (!cert || !field || !out) return 0; + if (!cert || !field || !out || outlen == 0) return 0; int nid = -1; @@ -1193,10 +1193,33 @@ static int getCertSubjectFieldByName(X509 *cert, const char *field, char *out, s if (nid == -1) return 0; +#if OPENSSL_VERSION_NUMBER >= 0x30000000L + const X509_NAME *subject = X509_get_subject_name(cert); +#else X509_NAME *subject = X509_get_subject_name(cert); +#endif if (!subject) return 0; - return X509_NAME_get_text_by_NID(subject, nid, out, outlen) > 0; + /* X509_NAME_get_text_by_NID is deprecated in OpenSSL 4.0 */ + int idx = X509_NAME_get_index_by_NID(subject, nid, -1); + if (idx < 0) return 0; + + const X509_NAME_ENTRY *entry = X509_NAME_get_entry(subject, idx); + if (!entry) return 0; + + const ASN1_STRING *data = X509_NAME_ENTRY_get_data(entry); + if (!data) return 0; + + const unsigned char *str = ASN1_STRING_get0_data(data); + int len = ASN1_STRING_length(data); + if (!str || len <= 0) return 0; + + /* Copy to output buffer, ensuring null termination */ + size_t copy_len = (size_t)len < outlen - 1 ? (size_t)len : outlen - 1; + memcpy(out, str, copy_len); + out[copy_len] = '\0'; + + return 1; } /* Extract URI from Subject Alternative Name extension and return the first diff --git a/src/unit/test_queues.cpp b/src/unit/test_queues.cpp index 05b7687a6ee..729315b0908 100644 --- a/src/unit/test_queues.cpp +++ b/src/unit/test_queues.cpp @@ -22,6 +22,7 @@ extern "C" { class SpscQueueTest : public ::testing::Test { protected: spscQueue q; + static constexpr size_t SPSC_QUEUE_SIZE = 4096; struct ConsumerArg { spscQueue *q; @@ -29,7 +30,7 @@ class SpscQueueTest : public ::testing::Test { }; void SetUp() override { - spscInit(&q); + spscInit(&q, SPSC_QUEUE_SIZE); } void TearDown() override { @@ -139,6 +140,7 @@ TEST_F(SpscQueueTest, TestSpscConcurrent) { class SpmcQueueTest : public ::testing::Test { protected: spmcQueue q; + static constexpr size_t SPMC_QUEUE_SIZE = 4096; struct ConsumerArg { spmcQueue *q; @@ -147,7 +149,7 @@ class SpmcQueueTest : public ::testing::Test { }; void SetUp() override { - spmcInit(&q); + spmcInit(&q, SPMC_QUEUE_SIZE); ASSERT_NE(q.buffer, nullptr); EXPECT_EQ(reinterpret_cast(q.buffer) % CACHE_LINE_SIZE, 0u); } @@ -251,6 +253,7 @@ TEST_F(SpmcQueueTest, TestSpmcConcurrent) { class MpscQueueTest : public ::testing::Test { protected: mpscQueue q; + static constexpr size_t MPSC_QUEUE_SIZE = 16384; struct ProducerArg { mpscQueue *q; @@ -259,7 +262,7 @@ class MpscQueueTest : public ::testing::Test { }; void SetUp() override { - mpscInit(&q); + mpscInit(&q, MPSC_QUEUE_SIZE); } void TearDown() override { diff --git a/src/zipmap.c b/src/zipmap.c index 5214eaa5782..f4e12409dfc 100644 --- a/src/zipmap.c +++ b/src/zipmap.c @@ -201,6 +201,12 @@ int zipmapValidateIntegrity(unsigned char *zm, size_t size, int deep) { return 0; p += s; /* skip the encoded field size */ + /* Bounds-check the field length before advancing 'p'. Advancing the + * pointer first can wrap, since 'l' is an attacker-controlled 32-bit + * value, so the OUT_OF_RANGE check below could be bypassed. Compare + * against the bytes remaining in the zipmap in 64-bit space so the + * arithmetic cannot overflow on 32-bit platforms either (CWE-190). */ + if ((uint64_t)l > (uint64_t)(zm + size - 1 - p)) return 0; p += l; /* skip the field */ /* make sure the entry doesn't reach outside the edge of the zipmap */ @@ -216,8 +222,13 @@ int zipmapValidateIntegrity(unsigned char *zm, size_t size, int deep) { /* Sanity check: length < 254 must be encoded in 1 byte, not 5 bytes */ if (l < ZIPMAP_BIGLEN && s != 1) return 0; - p += s; /* skip the encoded value size*/ - e = *p++; /* skip the encoded free space (always encoded in one byte) */ + p += s; /* skip the encoded value size*/ + e = *p++; /* skip the encoded free space (always encoded in one byte) */ + /* Bounds-check 'l + e' before advancing 'p'. Both are unsigned int, + * so 'l + e' can wrap to a small value (e.g. l=0xFFFFFFFF, e=1) and + * slip past the OUT_OF_RANGE check below, defeating validation. Do + * the comparison in 64-bit space so it cannot overflow (CWE-190). */ + if ((uint64_t)l + e > (uint64_t)(zm + size - 1 - p)) return 0; p += l + e; /* skip the value and free space */ count++; diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 629b4ec3035..e8a5c3c3f98 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -39,6 +39,22 @@ test {corrupt payload: hash with valid zip list header, invalid entry len} { } } +test {corrupt payload: zipmap value length integer overflow} { + # A zipmap value length of 0xFFFFFFFF with a free byte of 1 makes the + # internal 'l + e' sum wrap to 0 in 32-bit arithmetic. Before the fix the + # validator advanced its cursor by the wrapped value and wrongly accepted + # the payload, which leads to out-of-bounds access on 32-bit builds. + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + r debug set-skip-checksum-validation 1 + catch { + r restore key 0 "\x09\x0c\x01\x03\x66\x6f\x6f\xfe\xff\xff\xff\xff\x01\xff\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00" + } err + assert_match "*Bad data format*" $err + verify_log_message 0 "*Zipmap integrity check failed*" 0 + assert_equal [r ping] "PONG" + } +} + test {corrupt payload: invalid zlbytes header} { start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { catch { @@ -780,6 +796,34 @@ test {corrupt payload: fuzzer findings - zset zslInsert with a NAN score} { } } +test {corrupt payload: zset listpack with NAN score} { + # A listpack-encoded sorted set whose score parses to NAN must be rejected + # on load. zslInsertNode() asserts the score is not NAN, so otherwise the + # server would crash when the zset is converted to a skiplist (e.g. when it + # grows past zset-max-listpack-entries). The skiplist RDB format already + # rejects NAN scores; this covers the listpack format. + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + r debug set-skip-checksum-validation 1 + catch {r restore _nan_zset_lp 0 "\x11\x19\x19\x00\x00\x00\x04\x00\x82\x6D\x31\x03\x83\x6E\x61\x6E\x04\x82\x6D\x32\x03\x83\x32\x2E\x35\x04\xFF\x50\x00\xC5\x5C\xC6\x0C\x7D\xFF\xB5\x52"} err + assert_match "*Bad data format*" $err + verify_log_message 0 "*NAN score*" 0 + assert_equal [r ping] "PONG" + } +} + +test {corrupt payload: zset ziplist with NAN score} { + # Same as the listpack case but for the legacy ziplist format, which is + # converted to a listpack on load. A NAN score must be rejected so it can + # not crash the server when the zset is later converted to a skiplist. + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + r debug set-skip-checksum-validation 1 + catch {r restore _nan_zset_zl 0 "\x0C\x1D\x1D\x00\x00\x00\x17\x00\x00\x00\x04\x00\x00\x02\x6D\x31\x04\x03\x6E\x61\x6E\x05\x02\x6D\x32\x04\x03\x32\x2E\x35\xFF\x0B\x00\x00\x00\x00\x00\x00\x00\x00\x00"} err + assert_match "*Bad data format*" $err + verify_log_message 0 "*NAN score*" 0 + assert_equal [r ping] "PONG" + } +} + test {corrupt payload: fuzzer findings - streamLastValidID panic} { start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { r config set sanitize-dump-payload yes diff --git a/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl b/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl new file mode 100644 index 00000000000..1dd6f8a9240 --- /dev/null +++ b/tests/sentinel/tests/19-coordinated-failover-killed-link.tcl @@ -0,0 +1,67 @@ +# Regression test for coordinated failover when the Sentinel command link to +# the old primary is disconnected just as the promoted replica reports its new +# role. In that window Sentinel must not dereference a NULL command link while +# doing its best-effort client cleanup on the old primary. + +source "../tests/includes/init-tests.tcl" + +foreach_sentinel_id id { + S $id sentinel debug info-period 1000 + S $id sentinel debug publish-period 100 + S $id sentinel debug ping-period 100 +} + +proc sentinel_cmd_client_name {sentinel_id} { + set myid [S $sentinel_id SENTINEL MYID] + return "sentinel-[string range $myid 0 7]-cmd" +} + +test "Coordinated failover tolerates old primary command link disconnect" { + set sentinel_id 0 + set old_master_id $master_id + set old_port [RPort $old_master_id] + set old_addr [S $sentinel_id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster] + assert {[lindex $old_addr 1] == $old_port} + + set sentinel_client_name [sentinel_cmd_client_name $sentinel_id] + + wait_for_condition 1000 50 { + [string match "*name=$sentinel_client_name*" [R $old_master_id CLIENT LIST]] + } else { + fail "Sentinel command client $sentinel_client_name was not connected to the old primary" + } + + wait_for_condition 300 50 { + [catch {S $sentinel_id SENTINEL FAILOVER mymaster COORDINATED}] == 0 + } else { + catch {S $sentinel_id SENTINEL FAILOVER mymaster COORDINATED} reply + fail "Sentinel manual coordinated failover did not start, got: $reply" + } + + wait_for_condition 1000 10 { + [dict get [S $sentinel_id SENTINEL PRIMARY mymaster] failover-state] eq {wait_promotion} + } else { + fail "Sentinel did not reach wait_promotion state" + } + + # Keep the command connection from this Sentinel to the old primary closed + # while the next INFO reply from the promoted replica advances the failover. + # Before the fix, sentinelKillClients(old-primary) calls + # valkeyAsyncCommand(NULL, ...) in this window, and the Sentinel crashes. + S $sentinel_id sentinel debug ping-period 10000 + catch {R $old_master_id CLIENT KILL NAME $sentinel_client_name} + + wait_for_condition 1000 10 { + [string match "*disconnected*" [dict get [S $sentinel_id SENTINEL PRIMARY mymaster] flags]] + } else { + fail "Sentinel command link to the old primary did not disconnect" + } + + assert_equal {PONG} [S $sentinel_id PING] + + wait_for_condition 1000 50 { + [lindex [S $sentinel_id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster] 1] != $old_port + } else { + fail "Sentinel did not complete the coordinated failover" + } +} diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 832f3ca8654..65afff98907 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -364,8 +364,9 @@ proc spawn_server {executable config_file stdout stderr args} { read stdin 1 } - # Tell the test server about this new instance. - send_data_packet $::test_server_fd server-spawned "$pid - $::curfile" + # Tell the test server about this new instance. Send the log path too so + # the orchestrator can dump it if the test times out. + send_data_packet $::test_server_fd server-spawned [list $pid $stdout $::curfile] return $pid } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index c70f5c6395a..d95a54c036b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -78,6 +78,7 @@ set ::failures_output_file ""; # If set, write failures JSON to this path set ::timeout 1200; # 20 minutes without progresses will quit the test. set ::last_progress [clock seconds] set ::active_servers {} ; # Pids of active server instances. +array set ::server_logs {} ; # Maps server pid -> stdout log path. set ::dont_clean 0 set ::dont_pre_clean 0 set ::wait_server 0 @@ -418,6 +419,7 @@ proc test_server_cron {} { } } show_clients_state + dump_server_logs force_kill_all_servers kill_clients the_end @@ -509,13 +511,15 @@ proc read_from_test_client fd { } elseif {$status eq {server-spawning}} { set ::active_clients_task($fd) "(SPAWNING SERVER) $data" } elseif {$status eq {server-spawned}} { - set pid [string trim [lindex [split $data "-"] 0]] + set pid [string trim [lindex $data 0]] + set ::server_logs($pid) [lindex $data 1] lappend ::active_servers $pid - set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data" + set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$pid" } elseif {$status eq {server-killing}} { set ::active_clients_task($fd) "(KILLING SERVER) pid:$data" } elseif {$status eq {server-killed}} { set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data] + unset -nocomplain ::server_logs($data) set ::active_clients_task($fd) "(KILLED SERVER) pid:$data" } elseif {$status eq {run_solo}} { lappend ::run_solo_tests $data @@ -545,6 +549,18 @@ proc kill_clients {} { } } +# Print the tail of each still-running server's log. Called on timeout so the +# log of the stuck server is visible in CI output for troubleshooting. +proc dump_server_logs {} { + foreach pid $::active_servers { + if {![info exists ::server_logs($pid)]} continue + set log $::server_logs($pid) + if {![file exists $log]} continue + puts "\n=== Server log (pid $pid): $log ===" + catch {puts [exec tail -n 100 $log]} + } +} + proc force_kill_all_servers {} { foreach p $::active_servers { puts "Killing still running Valkey server $p" diff --git a/tests/unit/acl-v2.tcl b/tests/unit/acl-v2.tcl index 2158e10324b..6822c68154d 100644 --- a/tests/unit/acl-v2.tcl +++ b/tests/unit/acl-v2.tcl @@ -347,6 +347,24 @@ start_server {tags {"acl external:skip"}} { assert_equal "" [dict get $secondary_selector databases] } + test {Test ACL LIST omits implicit alldbs} { + set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user default*"]] + assert_no_match "* alldbs *" $response + + r ACL SETUSER user-list-alldbs on nopass -@all +get ~* &* alldbs + set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]] + assert_no_match "* alldbs *" $response + + r ACL SETUSER user-list-alldbs db=0,1 + set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]] + assert_match "* db=0,1 *" $response + + r ACL SETUSER user-list-alldbs resetdbs + set response [lindex [r ACL LIST] [lsearch [r ACL LIST] "user user-list-alldbs*"]] + assert_match "* resetdbs *" $response + + r ACL DELUSER user-list-alldbs + } test {Test ACL list idempotency} { r ACL SETUSER user-idempotency off -@all +get resetchannels &channel1 %R~foo1 %W~bar1 ~baz1 (-@all +set resetchannels &channel2 %R~foo2 %W~bar2 ~baz2) diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index c1b827c42b0..bf262447a35 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -179,7 +179,7 @@ start_server {tags {"acl external:skip"}} { set curruser "hpuser" foreach user [lshuffle $users] { if {[string first $curruser $user] != -1} { - assert_equal {user hpuser on nopass sanitize-payload resetchannels &foo alldbs +@all} $user + assert_equal {user hpuser on nopass sanitize-payload resetchannels &foo +@all} $user } } @@ -1145,9 +1145,16 @@ start_server [list overrides [list "dir" $server_path "acl-pubsub-default" "allc } {} {needs:debug} test {ACL load and save} { - r ACL setuser eve +get allkeys >eve on + r ACL setuser eve +get allkeys >eve on alldbs r ACL save + # ACL SAVE uses the same serialization as ACL LIST, + # verify that ACL file omits the implicit alldbs rule. + set aclfile [file join \ + [lindex [r CONFIG GET dir] 1] \ + [lindex [r CONFIG GET aclfile] 1]] + assert_equal 0 [count_message_lines $aclfile alldbs] + r ACL load # Clients should not be disconnected since permissions haven't changed @@ -1357,6 +1364,12 @@ start_server {overrides {user "default on nopass ~* +@all -flushdb"} tags {acl e test {ACL from config file and config rewrite} { assert_error {NOPERM *} {r flushdb} r config rewrite + + # CONFIG REWRITE persists ACL users through the ACL string + # serializer, and should not have the implicit alldbs rule. + set config_file [srv 0 config_file] + assert_equal 0 [count_message_lines $config_file alldbs] + restart_server 0 true false assert_error {NOPERM *} {r flushdb} } diff --git a/tests/unit/cluster/clusterscan.tcl b/tests/unit/cluster/clusterscan.tcl index e65a5d5f6db..83a41fdf880 100644 --- a/tests/unit/cluster/clusterscan.tcl +++ b/tests/unit/cluster/clusterscan.tcl @@ -621,10 +621,20 @@ start_cluster 3 0 {tags {external:skip cluster}} { # With full-coverage=yes the cluster enters FAIL state. # Cursors for slot 0 should get "Hash slot not served". # Cursors for assigned but remote slots should get "cluster is down". - R 0 CLUSTER DELSLOTS 0 - catch {R 1 CLUSTER DELSLOTS 0} - catch {R 2 CLUSTER DELSLOTS 0} - wait_for_cluster_state fail + # + # Retry DELSLOTS in a loop: R0's old stale packet can rebind slot 0 + # to R0 on R1/R2 and undoing the DELSLOTS. Loop until all nodes converge + # to FAIL with slot 0 unassigned. + wait_for_condition 1000 50 { + [catch {R 0 CLUSTER DELSLOTS 0}] >= 0 && + [catch {R 1 CLUSTER DELSLOTS 0}] >= 0 && + [catch {R 2 CLUSTER DELSLOTS 0}] >= 0 && + [CI 0 cluster_state] eq "fail" && + [CI 1 cluster_state] eq "fail" && + [CI 2 cluster_state] eq "fail" + } else { + fail "Cluster did not converge to FAIL after DELSLOTS" + } # Unassigned slot -> specific error. assert_error {CLUSTERDOWN Hash slot not served} {R 0 clusterscan "0-{06S}-0"} diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index a3b4cd13abe..e03e30c1fb8 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -470,4 +470,38 @@ start_server {tags {"dump"}} { r debug set-skip-checksum-validation 0 assert_match {*Bad data format*} $err } {} {needs:debug} + + test {RESTORE rejects stream with shared NACK across consumers} { + # Create a valid stream with consumer group and two consumers + r DEL mystream + r XADD mystream 1-1 f v + r XADD mystream 2-1 f v + r XGROUP CREATE mystream grp 0 + r XREADGROUP GROUP grp consumer1 COUNT 1 STREAMS mystream ">" + r XREADGROUP GROUP grp consumer2 COUNT 1 STREAMS mystream ">" + + set dump [r DUMP mystream] + r DEL mystream + + # In the dump, consumer2's PEL entry contains message ID 2-1. + # Replace it with 1-1 (same as consumer1's) to create a shared NACK. + # Message ID 2-1 in big-endian: \x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01 + # Message ID 1-1 in big-endian: \x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01 + # Find the last occurrence of ID 2-1 (in consumer2's PEL) and replace with 1-1 + set id_2_1 "\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01" + set id_1_1 "\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01" + + # Find the last occurrence (consumer2's PEL entry, not the global PEL) + set last_pos [string last $id_2_1 $dump] + if {$last_pos == -1} { + fail "Could not find message ID 2-1 in dump" + } + set corrupt_dump [string replace $dump $last_pos [expr {$last_pos + 15}] $id_1_1] + + # Skip CRC validation since we modified the payload + r debug set-skip-checksum-validation 1 + catch {r RESTORE mystream 0 $corrupt_dump} err + r debug set-skip-checksum-validation 0 + assert_match {*Bad data format*} $err + } {} {needs:debug} } diff --git a/tests/unit/hashexpire.tcl b/tests/unit/hashexpire.tcl index 00b983cad7a..61f5a549efc 100644 --- a/tests/unit/hashexpire.tcl +++ b/tests/unit/hashexpire.tcl @@ -1540,6 +1540,29 @@ start_server {tags {"hashexpire"}} { } } + test "HRANDFIELD - CASE 4: does not loop forever when valid fields fewer than count" { + r FLUSHALL + r DEBUG SET-ACTIVE-EXPIRE 0 + + # 71 expired fields + 29 valid fields = 100 total + # count=30, count*3=90 < 100 -> CASE 4 + for {set i 1} {$i <= 71} {incr i} { + r HSETEX myhash PX 1 FIELDS 1 f$i v$i + } + for {set i 72} {$i <= 100} {incr i} { + r HSET myhash f$i v$i + } + + # Wait for fields to expire + after 100 + assert_equal 100 [r HLEN myhash] + + # Should return at most 29 valid fields without + assert_lessthan_equal [llength $result] 29 + + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + test "HRANDFIELD - returns null response when all fields are expired" { r FLUSHALL r DEBUG SET-ACTIVE-EXPIRE 0 diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl index ef5a70ce8e1..bd8f912e9f4 100644 --- a/tests/unit/introspection-2.tcl +++ b/tests/unit/introspection-2.tcl @@ -265,6 +265,34 @@ start_server {tags {"introspection"}} { assert_equal {{}} [r command info config|get|key] } + test {COMMAND INFO subcommands field uses array not set type in RESP3 for commands with no subcommands} { + r hello 3 + r readraw 1 + r deferred 1 + r command info ping + assert_equal [r read] {*1} ;# outer array: 1 command + assert_equal [r read] {*10} ;# command info: 10 fields + assert_equal [r read] {$4} ;# name (bulk string type) + assert_equal [r read] {ping} ;# name value + assert_equal [r read] {:-1} ;# arity + set flags_hdr [r read] ;# flags (~N set) + for {set i 0} {$i < [string range $flags_hdr 1 end]} {incr i} { r read } + assert_equal [r read] {:0} ;# first_key + assert_equal [r read] {:0} ;# last_key + assert_equal [r read] {:0} ;# step + set acl_hdr [r read] ;# acl_categories (~N set) + for {set i 0} {$i < [string range $acl_hdr 1 end]} {incr i} { r read } + set tips_hdr [r read] ;# tips (~N set) + set tips_count [string range $tips_hdr 1 end] + for {set i 0} {$i < $tips_count} {incr i} { r read ; r read } + assert_equal [r read] {~0} ;# key_specs: correctly a Set + set subcommands_hdr [r read] ;# subcommands: should be Array not Set + r readraw 0 + r deferred 0 + r hello 2 + assert_equal {*0} $subcommands_hdr + } {} {resp3} + foreach cmd {SET GET MSET BITFIELD LMOVE LPOP BLPOP PING MEMORY MEMORY|USAGE RENAME GEORADIUS_RO} { test "$cmd command will not be marked with movablekeys" { set info [lindex [r command info $cmd] 0] diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index 9c1cfa8ba4d..604fe9981ec 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -94,6 +94,42 @@ tags "modules" { assert_equal [r get testkeyspace:expired] 1 } + test {MOVE restores client DB after module keyspace notification} { + # MOVE fires move_from/move_to notifications on the source and + # destination DBs. The module subscribes to NOTIFY_GENERIC, and + # the bug left the client selected on the destination DB. + r flushall + r select 0 + r set movekey value + + # The client must stay on its original DB (c->db unchanged). + assert_equal 1 [r move movekey 10] + assert_match {*db=0*} [r client info] + + # If the client was still on db10, this would fail with + # "source and destination objects are the same" instead of 0. + assert_equal 0 [r move movekey 10] + assert_match {*db=0*} [r client info] + } {} {singledb:skip} + + test {COPY restores client DB after module keyspace notification} { + # COPY fires a copy_to notification on the destination DB. The + # module subscribes to NOTIFY_GENERIC, and the bug left the client + # selected on the destination DB. + r flushall + r select 0 + r set copykey value + + # The client must stay on its original DB (c->db unchanged). + assert_equal 1 [r copy copykey copykey DB 10] + assert_match {*db=0*} [r client info] + + # If the client was still on db10, this would fail with + # "source and destination objects are the same" instead of 0 + assert_equal 0 [r copy copykey copykey DB 10] + assert_match {*db=0*} [r client info] + } {} {singledb:skip} + test "Unload the module - testkeyspace" { assert_equal {OK} [r module unload testkeyspace] } diff --git a/tests/unit/moduleapi/usercall.tcl b/tests/unit/moduleapi/usercall.tcl index 80c9f883de8..29e5ff9797c 100644 --- a/tests/unit/moduleapi/usercall.tcl +++ b/tests/unit/moduleapi/usercall.tcl @@ -29,7 +29,7 @@ start_server {tags {"modules usercall network"}} { assert_equal [r usercall.reset_user] OK assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK # off and sanitize-payload because module user / default value - assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set" + assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set" # doesn't fail for regular commands as just testing acl here assert_equal [r usercall.$cmd {} set x 10] OK @@ -48,7 +48,7 @@ start_server {tags {"modules usercall network"}} { assert_equal [r usercall.reset_user] OK assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK # off and sanitize-payload because module user / default value - assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set" + assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set" # fails here as testing acl in rm call assert_error {*NOPERM User module_user has no permissions*} {r usercall.$cmd C set x 10} @@ -111,7 +111,7 @@ start_server {tags {"modules usercall network"}} { assert_equal [r usercall.reset_user] OK assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK # off and sanitize-payload because module user / default value - assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* alldbs +@all -set" + assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set" # passes as not checking ACL assert_equal [r usercall.$cmd {} evalsha $sha_set 0] 1 diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 46f4b1b7bbe..529b6ca4eca 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -585,8 +585,10 @@ start_server {tags {"other external:skip"}} { start_server {tags {"other external:skip"}} { test "test io-threads are runtime modifiable" { + # Each toggle spawns/joins real pthreads; too slow for 100 iterations under Valgrind. + set iterations [expr {$::valgrind ? 10 : 100}] # Randomly set the number of threads between 1 and 5 - for {set i 0} {$i < 100} {incr i} { + for {set i 0} {$i < $iterations} {incr i} { set random_num [expr {int(rand() * 5) + 1}] r config set io-threads $random_num set thread_num [lindex [r config get io-threads] 1] diff --git a/tests/unit/type/hash.tcl b/tests/unit/type/hash.tcl index 1b519652048..a089d5bb9c7 100644 --- a/tests/unit/type/hash.tcl +++ b/tests/unit/type/hash.tcl @@ -512,7 +512,7 @@ start_server {tags {"hash"}} { } test {HGETDEL - check for syntax and type errors} { - assert_error "*value is not an integer or out of range" {r hgetdel myhash a b c} + assert_error "*ERR syntax error" {r hgetdel myhash a 1 c} assert_error "*value is not an integer or out of range" {r hgetdel myhash FIELDS a b c} assert_error "*numfields should be greater than 0 and match the provided number of fields" {r hgetdel myhash FIELDS 2 a b c} assert_error "*numfields should be greater than 0 and match the provided number of fields" {r hgetdel myhash FIELDS 4 a b c}