Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ ENGINE_SERVER_OBJ = \
ziplist.o \
zipmap.o \
zmalloc.o \
queues.o
queues.o
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ = \
Expand Down
2 changes: 1 addition & 1 deletion src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ static sds ACLDescribeSelector(aclSelector *selector) {

/* Database permissions. */
if (selector->flags & SELECTOR_FLAG_ALLDBS) {
res = sdscatlen(res, "alldbs ", 7);
/* alldbs is default, avoid emitting it in ACL strings for compatibility. */
} else if (intsetLen(selector->dbs) == 0) {
res = sdscatlen(res, "resetdbs ", 9);
} else {
Expand Down
84 changes: 38 additions & 46 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include "queues.h"
#include <sys/resource.h>

#define IO_MPSC_QUEUE_SIZE 16384
#define IO_SPMC_QUEUE_SIZE 4096
#define IO_SPSC_QUEUE_SIZE 4096

static _Thread_local int thread_id = 0;
static _Thread_local mpscTicket io_thread_ticket = {0};
/* Backlog of responses when io_shared_outbox is full. Should be rare. */
Expand Down Expand Up @@ -44,8 +48,8 @@ static inline void untagJob(void *tagged_ptr, void **ptr, int *type) {
/* Handler prototypes */
void ioThreadReadQueryFromClient(client *c);
void ioThreadWriteToClient(client *c);
void IOThreadFreeArgv(robj **argv);
void IOThreadPoll(aeEventLoop *el);
void ioThreadFreeArgv(robj **argv);
void ioThreadPoll(aeEventLoop *el);
static void ioThreadAccept(client *c);

int inMainThread(void) {
Expand Down Expand Up @@ -107,9 +111,6 @@ void waitForClientIO(client *c) {
}

void IOThreadsBeforeSleep(long long current_time) {
#ifndef RUSAGE_THREAD
UNUSED(current_time);
#endif
if (server.io_threads_num == 1) return;
serverAssert(inMainThread());

Expand All @@ -126,29 +127,23 @@ void IOThreadsBeforeSleep(long long current_time) {
}
}

#ifdef RUSAGE_THREAD
/* If threads are not active track main thread CPU time for ignition decision */
/* If threads are not active, track main-thread active time for ignition decision */
if (server.active_io_threads_num == 1) {
static long long last_measurement_time = 0;
if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */
last_measurement_time = current_time;
struct rusage ru;
if (getrusage(RUSAGE_THREAD, &ru) == 0) {
long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec;
long long user_time_us = ru.ru_utime.tv_sec * 1000000LL + ru.ru_utime.tv_usec;
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000);
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER, user_time_us, current_time, 1000000);
}
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, server.stat_active_time, current_time, 1000000);
}
#endif
}

#define IO_COOLDOWN_MS 1000
#define IO_SAMPLE_RATE_MS 10
#define IO_IGNITION_EVENTS 4
#define IO_IGNITION_CPU_SYS 30.0
#define IO_IGNITION_CPU_SYS_LOW 5.0
#define IO_IGNITION_CPU_USER 50.0
/* Start using I/O threads when the main thread is active for more than the below
* defined percentage of the time. This number is picked somewhat arbitrarily but
* needed to be low enough to make sure we start the next thread quickly while not
* starting too many threads unnecessarily to avoid contention. */
#define IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT 30
#define BATCH_SIZE 32

void IOThreadsAfterSleep(int numevents) {
Expand All @@ -171,15 +166,9 @@ void IOThreadsAfterSleep(int numevents) {
/* Ignition Policy */
if (server.active_io_threads_num == 1) {
int should_ignite = 0;
#ifdef RUSAGE_THREAD
float cpu_sys = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS) / 10000.0;
float cpu_user = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER) / 10000.0;
/* Ignite IO threads if sys CPU > 30%, or if sys CPU > 5% and user CPU > 50% */
should_ignite = (cpu_sys > IO_IGNITION_CPU_SYS) ||
(cpu_sys > IO_IGNITION_CPU_SYS_LOW && cpu_user > IO_IGNITION_CPU_USER);
#else
should_ignite = (numevents >= IO_IGNITION_EVENTS);
#endif
float main_thread_active_time = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME) / 10000.0;
/* Ignite IO threads when main-thread active time exceeds the threshold (30%) */
should_ignite = (main_thread_active_time > (float)IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT);
if (should_ignite) {
pthread_mutex_unlock(&io_threads_mutex[1]);
server.active_io_threads_num++;
Expand Down Expand Up @@ -243,7 +232,7 @@ void IOThreadsAfterSleep(int numevents) {

/* This function performs polling on the given event loop and updates the server's
* IO fired events count and poll state. */
void IOThreadPoll(aeEventLoop *el) {
void ioThreadPoll(aeEventLoop *el) {
struct timeval tvp = {0, 0};
int num_events = aePoll(el, &tvp);
server.io_ae_fired_events = num_events;
Expand Down Expand Up @@ -326,10 +315,10 @@ static void *IOThreadMain(void *myid) {

switch (type) {
case JOB_REQ_FREE_ARGV:
IOThreadFreeArgv((robj **)data);
ioThreadFreeArgv((robj **)data);
break;
case JOB_REQ_POLL:
IOThreadPoll((aeEventLoop *)data);
ioThreadPoll((aeEventLoop *)data);
break;
default:
serverPanic("Invalid SPSC job type: %d", type);
Expand All @@ -338,10 +327,8 @@ static void *IOThreadMain(void *myid) {
processed += batch_count;
}

/*
* PRIORITY 2: Shared Global Queue (SPMC)
* Only checked after SPSC is drained.
*/
/* PRIORITY 2: Shared Global Queue (SPMC)
* Only checked after SPSC is drained. */
void *tagged_job = spmcDequeue(&io_shared_inbox);
if (tagged_job) {
void *data;
Expand All @@ -362,7 +349,7 @@ static void *IOThreadMain(void *myid) {
ioThreadAccept((client *)data);
break;
case JOB_REQ_POLL:
IOThreadPoll((aeEventLoop *)data);
ioThreadPoll((aeEventLoop *)data);
break;
default:
serverPanic("Invalid SPMC job type: %d", type);
Expand Down Expand Up @@ -398,7 +385,7 @@ static void createIOThread(int id) {
serverAssert(id > 0 && id < server.io_threads_num);

/* Initialize the private SPSC queue for this thread */
spscInit(&io_private_inbox[id]);
spscInit(&io_private_inbox[id], IO_SPSC_QUEUE_SIZE);

pthread_t tid;
pthread_mutex_init(&io_threads_mutex[id], NULL);
Expand Down Expand Up @@ -458,7 +445,7 @@ int updateIOThreads(const char **err) {
* in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */
size_t pending = getPendingIOResponsesCount();

if (pending > MPSC_QUEUE_SIZE) {
if (pending > io_shared_outbox.queue_size) {
if (err) *err = "Can't update IO threads under load, try again later";
return 0;
}
Expand Down Expand Up @@ -497,8 +484,8 @@ void initIOThreads(int prev_threads_num) {
server.active_io_threads_num = 1; /* We start with threads not active. */
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;
spmcInit(&io_shared_inbox);
mpscInit(&io_shared_outbox);
spmcInit(&io_shared_inbox, IO_SPMC_QUEUE_SIZE);
mpscInit(&io_shared_outbox, IO_MPSC_QUEUE_SIZE);
io_jobs_submitted = 0;
atomic_init(&io_jobs_finished, 0);
prefetchCommandsBatchInit();
Expand Down Expand Up @@ -560,6 +547,7 @@ int trySendWriteToIOThreads(client *c) {
if (c->flag.lua_debug) return C_ERR;

int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
clientReplyBlock *block = NULL;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
Expand All @@ -571,14 +559,10 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
/* If buffer is encoded force new header */
if (block->flag.buf_encoded) block->last_header = NULL;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
/* If buffer is encoded force new header */
if (c->flag.buf_encoded) c->last_header = NULL;
}
}

Expand All @@ -597,7 +581,15 @@ int trySendWriteToIOThreads(client *c) {
c->io_last_bufpos = 0;
return C_ERR;
}

/* Force new header after successful enqueue so the main thread doesn't
* extend a header the I/O thread is currently reading. */
if (!is_replica) {
if (block) {
if (block->flag.buf_encoded) block->last_header = NULL;
} else {
if (c->flag.buf_encoded) c->last_header = NULL;
}
}
if (c->flag.pending_write) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flag.pending_write = 0;
Expand All @@ -609,7 +601,7 @@ int trySendWriteToIOThreads(client *c) {
}

/* Internal function to free the client's argv in an IO thread. */
void IOThreadFreeArgv(robj **argv) {
void ioThreadFreeArgv(robj **argv) {
int last_arg = 0;
for (int i = 0;; i++) {
robj *o = argv[i];
Expand Down
6 changes: 2 additions & 4 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ typedef enum {
JOB_REQ_ACCEPT,
JOB_REQ_COUNT
} JobRequest;
_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");
_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 8 for pointer arithmetic");

typedef enum {
JOB_RES_READ_CLIENT = 0,
JOB_RES_WRITE_CLIENT,
JOB_RES_COUNT
} JobResult;
_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic");

typedef void (*job_handler)(void *);
_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 8 for pointer arithmetic");

void initIOThreads(int prev_threads_num);
void killIOThreads(void);
Expand Down
11 changes: 11 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -9494,6 +9494,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
/* Remove irrelevant flags from the type mask */
type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);

/* When notifying via the executing client, the callbacks below select
* 'dbid' on its context. 'dbid' may differ from the client's currently
* selected DB (e.g. MOVE/COPY notify on the destination DB), so save the
* original DB and restore it afterwards to avoid leaving the client on the
* wrong DB for subsequent commands. */
client *executing_client = server.executing_client;
int origin_dbid = (executing_client != NULL) ? executing_client->db->id : -1;

while ((ln = listNext(&li))) {
ValkeyModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
Expand Down Expand Up @@ -9523,6 +9531,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
}
}

/* Restore the executing client's originally selected DB. */
if (executing_client != NULL) selectDb(executing_client, origin_dbid);

exitExecutionUnit();
}

Expand Down
19 changes: 2 additions & 17 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -6532,31 +6532,16 @@ void evictClients(void) {
size_t client_eviction_limit = getClientEvictionLimit();
if (client_eviction_limit == 0) return;

/* Variable to track memory of clients marked for close but not yet freed */
size_t pending_freed = 0;

while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
pending_freed >
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >
client_eviction_limit) {
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
if (freeClient(c)) server.stat_evictedclients++;
sdsfree(ci);
server.stat_evictedclients++;

if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
} else {
curr_bucket--;
if (curr_bucket < 0) {
Expand Down
Loading
Loading