|
27 | 27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | 28 | */ |
29 | 29 |
|
| 30 | +#include "adlist.h" |
| 31 | +#include "list.h" |
30 | 32 | #include "server.h" |
31 | 33 | #include "cluster.h" |
32 | 34 | #include "cluster_slot_stats.h" |
@@ -6373,16 +6375,15 @@ int processIOThreadsReadDone(void) { |
6373 | 6375 | if (ProcessingEventsWhileBlocked) { |
6374 | 6376 | /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. |
6375 | 6377 | * In this case, there may be some clients left in the batch waiting to be processed. */ |
6376 | | - processClientsCommandsBatch(); |
| 6378 | + processClientsCommandsBatch(NULL); |
6377 | 6379 | } |
6378 | 6380 |
|
6379 | 6381 | if (listLength(server.clients_pending_io_read) == 0) return 0; |
6380 | 6382 | int processed = 0; |
6381 | 6383 | /* Collect clients handled in this iteration to continue parsing any |
6382 | 6384 | * residual buffered data synchronously after batch execution (important |
6383 | | - * for edge-triggered transports like RDMA). Use IDs instead of raw client |
6384 | | - * pointers to avoid UAF if a client is freed while executing the batch. */ |
6385 | | - list *handled_clients_ids = listCreate(); |
| 6385 | + * for edge-triggered transports like RDMA). */ |
| 6386 | + list *handled_clients = listCreate(); |
6386 | 6387 | listNode *ln; |
6387 | 6388 |
|
6388 | 6389 | listNode *next = listFirst(server.clients_pending_io_read); |
@@ -6444,39 +6445,30 @@ int processIOThreadsReadDone(void) { |
6444 | 6445 |
|
6445 | 6446 | size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); |
6446 | 6447 | /* try to add the command to the batch */ |
6447 | | - int ret = addCommandToBatchAndProcessIfFull(c); |
| 6448 | + int ret = addCommandToBatchAndProcessIfFull(c, handled_clients); |
6448 | 6449 | /* If the command was not added to the commands batch, process it immediately */ |
6449 | 6450 | if (ret == C_ERR) { |
6450 | | - if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c); |
| 6451 | + if (processPendingCommandAndInputBuffer(c) == C_OK) { |
| 6452 | + beforeNextClient(c); |
| 6453 | + listAddNodeTail(handled_clients,c); |
| 6454 | + } |
6451 | 6455 | } |
6452 | 6456 | if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { |
6453 | 6457 | /* A client was unlink from the list possibly making the next node invalid */ |
6454 | 6458 | next = listFirst(server.clients_pending_io_read); |
6455 | 6459 | } |
6456 | | - |
6457 | | - /* Track the client by ID so we can drain any leftover buffered data even if no |
6458 | | - * further network events arrive. */ |
6459 | | - listAddNodeTail(handled_clients_ids, (void *)(uintptr_t)c->id); |
6460 | 6460 | } |
6461 | 6461 |
|
6462 | | - processClientsCommandsBatch(); |
| 6462 | + processClientsCommandsBatch(handled_clients); |
6463 | 6463 |
|
6464 | | - /* For clients handled in this iteration, if residual data remains in |
6465 | | - * querybuf, handle it now to avoid edge-triggered lost wakeups. Clients may |
6466 | | - * have been freed while executing the batch, so re-lookup by ID. */ |
6467 | 6464 | listIter handled_li; |
6468 | 6465 | listNode *handled_ln; |
6469 | | - listRewind(handled_clients_ids, &handled_li); |
| 6466 | + listRewind(handled_clients, &handled_li); |
6470 | 6467 | while ((handled_ln = listNext(&handled_li))) { |
6471 | | - uint64_t id = (uint64_t)(uintptr_t)listNodeValue(handled_ln); |
6472 | | - client *c = lookupClientByID(id); |
6473 | | - if (!c) continue; |
6474 | | - |
6475 | | - if (connUpdateState(c->conn)) { |
6476 | | - processPendingCommandAndInputBuffer(c); /* try to handle new arrival data if possible */ |
6477 | | - } |
| 6468 | + client *c = listNodeValue(handled_ln); |
| 6469 | + connUpdateState(c->conn); |
6478 | 6470 | } |
6479 | | - listRelease(handled_clients_ids); |
| 6471 | + listRelease(handled_clients); |
6480 | 6472 |
|
6481 | 6473 | return processed; |
6482 | 6474 | } |
|
0 commit comments