Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
*/

#include "memory_prefetch.h"
#include "adlist.h"
#include "server.h"
#include "io_threads.h"
#include <stdlib.h>

typedef enum {
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
Expand Down Expand Up @@ -214,7 +216,7 @@ static void prefetchCommands(void) {
}

/* Processes all the prefetched commands in the current batch. */
void processClientsCommandsBatch(void) {
void processClientsCommandsBatch(list *handled_clients) {
if (!batch || batch->client_count == 0) return;

/* If executed_commands is not 0,
Expand All @@ -231,7 +233,10 @@ void processClientsCommandsBatch(void) {
/* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */
batch->clients[i] = NULL;
batch->executed_commands++;
if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c);
if (processPendingCommandAndInputBuffer(c) != C_ERR) {
beforeNextClient(c);
if (handled_clients) listAddNodeTail(handled_clients, c);
}
}

resetCommandsBatch();
Expand Down Expand Up @@ -260,7 +265,7 @@ static void addCommandToBatch(struct serverCommand *cmd, robj **argv, int argc,
* if it becomes full.
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
int addCommandToBatchAndProcessIfFull(client *c) {
int addCommandToBatchAndProcessIfFull(client *c, list *handled_clients) {
if (!batch) return C_ERR;

batch->clients[batch->client_count++] = c;
Expand All @@ -283,7 +288,7 @@ int addCommandToBatchAndProcessIfFull(client *c) {
* We also check the client count to handle cases where
* no keys exist for the clients' commands. */
if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) {
processClientsCommandsBatch();
processClientsCommandsBatch(handled_clients);
}

return C_OK;
Expand Down
6 changes: 4 additions & 2 deletions src/memory_prefetch.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#ifndef MEMORY_PREFETCH_H
#define MEMORY_PREFETCH_H

#include "adlist.h"

struct client;

void prefetchCommandsBatchInit(void);
void processClientsCommandsBatch(void);
int addCommandToBatchAndProcessIfFull(struct client *c);
void processClientsCommandsBatch(list *handled_clients);
int addCommandToBatchAndProcessIfFull(struct client *c, list *handled_clients);
void removeClientFromPendingCommandsBatch(struct client *c);
int onMaxBatchSizeChange(const char **err);

Expand Down
27 changes: 22 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "adlist.h"
#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
Expand All @@ -40,6 +41,7 @@
#include "module.h"
#include "connection.h"
#include "zmalloc.h"
#include <stdint.h>
#include <strings.h>
#include <sys/socket.h>
#include <sys/uio.h>
Expand Down Expand Up @@ -6372,11 +6374,15 @@ int processIOThreadsReadDone(void) {
if (ProcessingEventsWhileBlocked) {
/* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively.
* In this case, there may be some clients left in the batch waiting to be processed. */
processClientsCommandsBatch();
processClientsCommandsBatch(NULL);
}

if (listLength(server.clients_pending_io_read) == 0) return 0;
int processed = 0;
/* Collect clients handled in this iteration to continue parsing any
* residual buffered data synchronously after batch execution (important
* for edge-triggered transports like RDMA). */
list *handled_clients = listCreate();
listNode *ln;

listNode *next = listFirst(server.clients_pending_io_read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This communication between IO threads and main thread is being changed completely in #3324. Lock-free queues are used instead of looping over the clients and checking each one if their IO is finished.

These lines are being deleted: https://github.com/valkey-io/valkey/pull/3324/changes#diff-252bce0cc340542712f0c1adf62e9035ea47a4a064321fbf40ec3dd4b814aaf2L6378

I think we can merge this fix first, so we can backport it to 9.0 and 8.x. (When was RDMA introduced?) But it might need to be rewritten later when #3324 gets merged.

Expand Down Expand Up @@ -6412,7 +6418,6 @@ int processIOThreadsReadDone(void) {
/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);

/* In accept state, no client's data was read - stop here. */
if (in_accept_state) continue;
Expand All @@ -6439,18 +6444,30 @@ int processIOThreadsReadDone(void) {

size_t list_length_before_command_execute = listLength(server.clients_pending_io_read);
/* try to add the command to the batch */
int ret = addCommandToBatchAndProcessIfFull(c);
int ret = addCommandToBatchAndProcessIfFull(c, handled_clients);
/* If the command was not added to the commands batch, process it immediately */
if (ret == C_ERR) {
if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c);
if (processPendingCommandAndInputBuffer(c) == C_OK) {
beforeNextClient(c);
listAddNodeTail(handled_clients, c);
}
}
if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) {
/* A client was unlink from the list possibly making the next node invalid */
next = listFirst(server.clients_pending_io_read);
}
}

processClientsCommandsBatch();
processClientsCommandsBatch(handled_clients);

listIter handled_li;
listNode *handled_ln;
listRewind(handled_clients, &handled_li);
while ((handled_ln = listNext(&handled_li))) {
client *c = listNodeValue(handled_ln);
connUpdateState(c->conn);
}
listRelease(handled_clients);

return processed;
}
Expand Down
Loading