Skip to content

Commit 25a41b7

Browse files
committed
opt for StringHashTable's for_each
update update
1 parent 4c78bad commit 25a41b7

File tree

7 files changed

+451
-86
lines changed

7 files changed

+451
-86
lines changed

be/src/pipeline/exec/aggregation_sink_operator.cpp

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -565,10 +565,9 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
565565
};
566566

567567
SCOPED_TIMER(_hash_table_emplace_timer);
568-
for (size_t i = 0; i < num_rows; ++i) {
569-
places[i] = *agg_method.lazy_emplace(state, i, creator,
570-
creator_for_null_key);
571-
}
568+
vectorized::lazy_emplace_batch(
569+
agg_method, state, num_rows, creator, creator_for_null_key,
570+
[&](uint32_t row, auto& mapped) { places[row] = mapped; });
572571

573572
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
574573
}},
@@ -650,10 +649,10 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
650649
};
651650

652651
SCOPED_TIMER(_hash_table_emplace_timer);
653-
for (i = 0; i < num_rows; ++i) {
654-
places[i] = *agg_method.lazy_emplace(state, i, creator,
655-
creator_for_null_key);
656-
}
652+
vectorized::lazy_emplace_batch(
653+
agg_method, state, num_rows, creator, creator_for_null_key,
654+
[&](uint32_t row) { i = row; },
655+
[&](uint32_t row, auto& mapped) { places[row] = mapped; });
657656
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
658657
return true;
659658
}
@@ -665,27 +664,26 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
665664
void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places,
666665
vectorized::ColumnRawPtrs& key_columns,
667666
uint32_t num_rows) {
668-
std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
669-
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
670-
"uninited hash table");
671-
},
672-
[&](auto& agg_method) -> void {
673-
using HashMethodType = std::decay_t<decltype(agg_method)>;
674-
using AggState = typename HashMethodType::State;
675-
AggState state(key_columns);
676-
agg_method.init_serialized_keys(key_columns, num_rows);
677-
678-
/// For all rows.
679-
for (size_t i = 0; i < num_rows; ++i) {
680-
auto find_result = agg_method.find(state, i);
681-
682-
if (find_result.is_found()) {
683-
places[i] = find_result.get_mapped();
684-
} else {
685-
places[i] = nullptr;
686-
}
687-
}
688-
}},
667+
std::visit(vectorized::Overload {
668+
[&](std::monostate& arg) -> void {
669+
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
670+
},
671+
[&](auto& agg_method) -> void {
672+
using HashMethodType = std::decay_t<decltype(agg_method)>;
673+
using AggState = typename HashMethodType::State;
674+
AggState state(key_columns);
675+
agg_method.init_serialized_keys(key_columns, num_rows);
676+
677+
/// For all rows.
678+
vectorized::find_batch(agg_method, state, num_rows,
679+
[&](uint32_t row, auto& find_result) {
680+
if (find_result.is_found()) {
681+
places[row] = find_result.get_mapped();
682+
} else {
683+
places[row] = nullptr;
684+
}
685+
});
686+
}},
689687
_agg_data->method_variant);
690688
}
691689

be/src/pipeline/exec/aggregation_source_operator.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -587,10 +587,9 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
587587
};
588588

589589
SCOPED_TIMER(_hash_table_emplace_timer);
590-
for (size_t i = 0; i < num_rows; ++i) {
591-
places[i] = *agg_method.lazy_emplace(state, i, creator,
592-
creator_for_null_key);
593-
}
590+
vectorized::lazy_emplace_batch(
591+
agg_method, state, num_rows, creator, creator_for_null_key,
592+
[&](uint32_t row, auto& mapped) { places[row] = mapped; });
594593

595594
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
596595
COUNTER_SET(_hash_table_memory_usage,

be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,9 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
310310
auto creator_for_null_key = [&]() { distinct_row.push_back(row); };
311311

312312
SCOPED_TIMER(_hash_table_emplace_timer);
313-
for (; row < num_rows; ++row) {
314-
agg_method.lazy_emplace(state, row, creator, creator_for_null_key);
315-
}
313+
vectorized::lazy_emplace_batch_void(agg_method, state, num_rows, creator,
314+
creator_for_null_key,
315+
[&](uint32_t r) { row = r; });
316316

317317
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
318318
}},

be/src/pipeline/exec/streaming_aggregation_operator.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -728,10 +728,10 @@ bool StreamingAggLocalState::_emplace_into_hash_table_limit(vectorized::Aggregat
728728
};
729729

730730
SCOPED_TIMER(_hash_table_emplace_timer);
731-
for (i = 0; i < num_rows; ++i) {
732-
places[i] = *agg_method.lazy_emplace(state, i, creator,
733-
creator_for_null_key);
734-
}
731+
vectorized::lazy_emplace_batch(
732+
agg_method, state, num_rows, creator, creator_for_null_key,
733+
[&](uint32_t row) { i = row; },
734+
[&](uint32_t row, auto& mapped) { places[row] = mapped; });
735735
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
736736
return true;
737737
}
@@ -807,10 +807,9 @@ void StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
807807
};
808808

809809
SCOPED_TIMER(_hash_table_emplace_timer);
810-
for (size_t i = 0; i < num_rows; ++i) {
811-
places[i] = *agg_method.lazy_emplace(state, i, creator,
812-
creator_for_null_key);
813-
}
810+
vectorized::lazy_emplace_batch(
811+
agg_method, state, num_rows, creator, creator_for_null_key,
812+
[&](uint32_t row, auto& mapped) { places[row] = mapped; });
814813

815814
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
816815
}},

be/src/pipeline/rec_cte_shared_state.h

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -57,38 +57,36 @@ struct RecCTESharedState : public BasicSharedState {
5757
raw_columns.push_back(col.get());
5858
}
5959

60-
std::visit(vectorized::Overload {
61-
[&](std::monostate& arg) -> void {
62-
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
63-
"uninited hash table");
64-
},
65-
[&](auto& agg_method) -> void {
66-
SCOPED_TIMER(hash_table_compute_timer);
67-
using HashMethodType = std::decay_t<decltype(agg_method)>;
68-
using AggState = typename HashMethodType::State;
69-
70-
AggState agg_state(raw_columns);
71-
agg_method.init_serialized_keys(raw_columns, num_rows);
72-
distinct_row.clear();
73-
74-
size_t row = 0;
75-
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
76-
HashMethodType::try_presis_key(key, origin, arena);
77-
ctor(key);
78-
distinct_row.push_back(row);
79-
};
80-
auto creator_for_null_key = [&]() {
81-
distinct_row.push_back(row);
82-
};
83-
84-
SCOPED_TIMER(hash_table_emplace_timer);
85-
for (; row < num_rows; ++row) {
86-
agg_method.lazy_emplace(agg_state, row, creator,
87-
creator_for_null_key);
88-
}
89-
COUNTER_UPDATE(hash_table_input_counter, num_rows);
90-
}},
91-
agg_data->method_variant);
60+
std::visit(
61+
vectorized::Overload {
62+
[&](std::monostate& arg) -> void {
63+
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
64+
"uninited hash table");
65+
},
66+
[&](auto& agg_method) -> void {
67+
SCOPED_TIMER(hash_table_compute_timer);
68+
using HashMethodType = std::decay_t<decltype(agg_method)>;
69+
using AggState = typename HashMethodType::State;
70+
71+
AggState agg_state(raw_columns);
72+
agg_method.init_serialized_keys(raw_columns, num_rows);
73+
distinct_row.clear();
74+
75+
size_t row = 0;
76+
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
77+
HashMethodType::try_presis_key(key, origin, arena);
78+
ctor(key);
79+
distinct_row.push_back(row);
80+
};
81+
auto creator_for_null_key = [&]() { distinct_row.push_back(row); };
82+
83+
SCOPED_TIMER(hash_table_emplace_timer);
84+
vectorized::lazy_emplace_batch_void(agg_method, agg_state, num_rows,
85+
creator, creator_for_null_key,
86+
[&](uint32_t r) { row = r; });
87+
COUNTER_UPDATE(hash_table_input_counter, num_rows);
88+
}},
89+
agg_data->method_variant);
9290

9391
if (distinct_row.size() == block.rows()) {
9492
blocks.emplace_back(std::move(block));

0 commit comments

Comments
 (0)