|
30 | 30 | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
31 | 31 | #include "cloud/config.h" |
32 | 32 | #include "common/compiler_util.h" // IWYU pragma: keep |
| 33 | +#include "common/config.h" |
33 | 34 | #include "common/status.h" |
34 | 35 | #include "core/block/column_with_type_and_name.h" |
35 | 36 | #include "core/column/column_nullable.h" |
@@ -386,6 +387,16 @@ Status BlockReader::_replace_key_next_block(Block* block, bool* eof) { |
386 | 387 | break; |
387 | 388 | } |
388 | 389 | } |
| 390 | + // Byte-budget check: after the inner loop _next_row is either EOF or the next different |
| 391 | + // key, so it is safe to stop accumulating here without repeating any row. |
| 392 | + if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && |
| 393 | + Block::columns_byte_size(target_columns) >= |
| 394 | + _reader_context.preferred_block_size_bytes) { |
| 395 | + if (UNLIKELY(_reader_context.record_rowids)) { |
| 396 | + _block_row_locations.resize(target_block_row); |
| 397 | + } |
| 398 | + break; |
| 399 | + } |
389 | 400 | } |
390 | 401 | _merged_rows += merged_row; |
391 | 402 | return Status::OK(); |
@@ -480,6 +491,14 @@ Status BlockReader::_agg_key_next_block(Block* block, bool* eof) { |
480 | 491 | if (target_block_row == _reader_context.batch_size) { |
481 | 492 | break; |
482 | 493 | } |
| 494 | + // Byte-budget check at group boundary: _next_row is the first row of the new group |
| 495 | + // and is still pending (not yet inserted), so stopping here is safe. |
| 496 | + if (config::enable_adaptive_batch_size && |
| 497 | + _reader_context.preferred_block_size_bytes > 0 && |
| 498 | + Block::columns_byte_size(target_columns) >= |
| 499 | + _reader_context.preferred_block_size_bytes) { |
| 500 | + break; |
| 501 | + } |
483 | 502 | _agg_data_counters.push_back(_last_agg_data_counter); |
484 | 503 | _last_agg_data_counter = 0; |
485 | 504 |
|
@@ -539,6 +558,15 @@ Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { |
539 | 558 | LOG(WARNING) << "next failed: " << res; |
540 | 559 | return res; |
541 | 560 | } |
| 561 | + // Byte-budget check: _next_row is already saved so stopping here is safe. |
| 562 | + if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 && |
| 563 | + Block::columns_byte_size(target_columns) >= |
| 564 | + _reader_context.preferred_block_size_bytes) { |
| 565 | + if (UNLIKELY(_reader_context.record_rowids)) { |
| 566 | + _block_row_locations.resize(target_block_row); |
| 567 | + } |
| 568 | + break; |
| 569 | + } |
542 | 570 | } while (target_block_row < _reader_context.batch_size); |
543 | 571 |
|
544 | 572 | if (_delete_sign_available) { |
|
0 commit comments