diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.cpp b/be/src/exec/operator/nested_loop_join_probe_operator.cpp index 7a3be55cbb1988..af4fd9806336cf 100644 --- a/be/src/exec/operator/nested_loop_join_probe_operator.cpp +++ b/be/src/exec/operator/nested_loop_join_probe_operator.cpp @@ -23,7 +23,9 @@ #include "common/exception.h" #include "core/block/block.h" #include "core/column/column.h" +#include "core/column/column_const.h" #include "core/column/column_filter_helper.h" +#include "core/column/column_nullable.h" #include "exec/operator/operator.h" namespace doris { @@ -31,6 +33,61 @@ class RuntimeState; } // namespace doris namespace doris { +namespace { +constexpr int8_t MARK_FALSE = 0; +constexpr int8_t MARK_TRUE = 1; +constexpr int8_t MARK_NULL = -1; + +ColumnPtr make_const_column_from_row(const ColumnWithTypeAndName& source, size_t row, size_t rows) { + return ColumnConst::create(source.column->cut(row, 1), rows); +} + +ColumnPtr align_eval_column_nullable(const ColumnWithTypeAndName& target, const ColumnPtr& column) { + if (target.type->is_nullable() && !column->is_nullable()) { + return make_nullable(column); + } + return column; +} + +void append_many_from_source(MutableColumnPtr& dst_column, const ColumnWithTypeAndName& src_column, + size_t row, size_t rows) { + if (!src_column.column->is_nullable() && dst_column->is_nullable()) { + const auto origin_size = dst_column->size(); + auto* nullable_column = assert_cast(dst_column.get()); + nullable_column->get_nested_column_ptr()->insert_many_from(*src_column.column, row, rows); + nullable_column->get_null_map_column().get_data().resize_fill(origin_size + rows, 0); + } else { + dst_column->insert_many_from(*src_column.column, row, rows); + } +} + +void append_filtered_from_source(MutableColumnPtr& dst_column, + const ColumnWithTypeAndName& src_column, + const IColumn::Filter& filter, size_t selected_rows) { + if (selected_rows == 0) { + return; + } + auto filtered_column = src_column.column->filter(filter, selected_rows); + if (!src_column.column->is_nullable() && dst_column->is_nullable()) { + const auto origin_size = dst_column->size(); + auto* nullable_column = assert_cast(dst_column.get()); + nullable_column->get_nested_column_ptr()->insert_range_from(*filtered_column, 0, + selected_rows); + nullable_column->get_null_map_column().get_data().resize_fill(origin_size + selected_rows, + 0); + } else { + dst_column->insert_range_from(*filtered_column, 0, selected_rows); + } +} + +void append_mark_value(MutableColumnPtr& dst_column, int8_t mark_value) { + auto* nullable_column = assert_cast(dst_column.get()); + auto& value_column = assert_cast(nullable_column->get_nested_column()); + value_column.get_data().push_back(mark_value == MARK_TRUE); + nullable_column->get_null_map_column().get_data().push_back(mark_value == MARK_NULL); +} +} // namespace + NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) : JoinProbeLocalState(state, @@ -59,6 +116,10 @@ Status NestedLoopJoinProbeLocalState::open(RuntimeState* state) { for (size_t i = 0; i < _join_conjuncts.size(); i++) { RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, _join_conjuncts[i])); } + _mark_join_conjuncts.resize(p._mark_join_conjuncts.size()); + for (size_t i = 0; i < _mark_join_conjuncts.size(); i++) { + RETURN_IF_ERROR(p._mark_join_conjuncts[i]->clone(state, _mark_join_conjuncts[i])); + } _construct_mutable_join_block(); return Status::OK(); } @@ -177,6 +238,512 @@ void process_build_block(int64_t build_block_pos, Block& block, const Block& bui block.set_columns(std::move(dst_columns)); } +void NestedLoopJoinProbeLocalState::_replace_lazy_placeholder_columns(size_t rows) { + auto& p = _parent->cast(); + for (size_t i = 0; i < _join_block.columns(); ++i) { + if (p._materialize_column_ids.find(cast_set(i)) != p._materialize_column_ids.end()) { + continue; + } + const auto& column = _join_block.get_by_position(i); + _join_block.replace_by_position(i, + column.type->create_column_const_with_default_value(rows)); + } +} + +Status NestedLoopJoinProbeLocalState::_append_lazy_rows(const IColumn::Filter& filter, + size_t selected_rows, bool fixed_side_probe, + int64_t fixed_side_pos, + const Block& probe_block, + const Block& build_block) { + auto& p = _parent->cast(); + const size_t old_rows = _join_block.rows(); + const size_t new_rows = old_rows + selected_rows; + + if (p._materialize_column_ids.empty()) { + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); + } + + auto dst_columns = _join_block.mutate_columns(); + for (int column_id : p._materialize_column_ids) { + const auto column_idx = cast_set(column_id); + if (column_idx < p._num_probe_side_columns) { + const auto& src_column = probe_block.get_by_position(column_idx); + if (fixed_side_probe) { + append_many_from_source(dst_columns[column_idx], src_column, fixed_side_pos, + selected_rows); + } else { + append_filtered_from_source(dst_columns[column_idx], src_column, filter, + selected_rows); + } + } else { + const auto build_column_idx = column_idx - p._num_probe_side_columns; + const auto& src_column = build_block.get_by_position(build_column_idx); + if (fixed_side_probe) { + append_filtered_from_source(dst_columns[column_idx], src_column, filter, + selected_rows); + } else { + append_many_from_source(dst_columns[column_idx], src_column, fixed_side_pos, + selected_rows); + } + } + } + _join_block.set_columns(std::move(dst_columns)); + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_append_lazy_probe_row_with_build_defaults( + const Block& probe_block, int64_t probe_row_pos) { + auto& p = _parent->cast(); + const size_t new_rows = _join_block.rows() + 1; + + if (p._materialize_column_ids.empty()) { + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); + } + + auto dst_columns = _join_block.mutate_columns(); + for (int column_id : p._materialize_column_ids) { + const auto column_idx = cast_set(column_id); + if (column_idx < p._num_probe_side_columns) { + const auto& src_column = probe_block.get_by_position(column_idx); + append_many_from_source(dst_columns[column_idx], src_column, probe_row_pos, 1); + } else { + dst_columns[column_idx]->insert_many_defaults(1); + } + } + _join_block.set_columns(std::move(dst_columns)); + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_append_lazy_mark_probe_row_with_build_defaults( + const Block& probe_block, int64_t probe_row_pos, int8_t mark_value) { + auto& p = _parent->cast(); + const size_t mark_column_id = p._num_probe_side_columns + p._num_build_side_columns; + const size_t new_rows = _join_block.rows() + 1; + + auto dst_columns = _join_block.mutate_columns(); + for (int column_id : p._materialize_column_ids) { + const auto column_idx = cast_set(column_id); + if (column_idx < p._num_probe_side_columns) { + const auto& src_column = probe_block.get_by_position(column_idx); + append_many_from_source(dst_columns[column_idx], src_column, probe_row_pos, 1); + } else if (column_idx == mark_column_id) { + append_mark_value(dst_columns[column_idx], mark_value); + } else { + dst_columns[column_idx]->insert_many_defaults(1); + } + } + _join_block.set_columns(std::move(dst_columns)); + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_append_lazy_build_rows_with_probe_defaults( + const Block& build_block, const IColumn::Filter& filter, size_t selected_rows) { + auto& p = _parent->cast(); + const size_t new_rows = _join_block.rows() + selected_rows; + + if (p._materialize_column_ids.empty()) { + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); + } + + auto dst_columns = _join_block.mutate_columns(); + for (int column_id : p._materialize_column_ids) { + const auto column_idx = cast_set(column_id); + if (column_idx < p._num_probe_side_columns) { + dst_columns[column_idx]->insert_many_defaults(selected_rows); + } else { + const auto build_column_idx = column_idx - p._num_probe_side_columns; + const auto& src_column = build_block.get_by_position(build_column_idx); + append_filtered_from_source(dst_columns[column_idx], src_column, filter, selected_rows); + } + } + _join_block.set_columns(std::move(dst_columns)); + _replace_lazy_placeholder_columns(new_rows); + DCHECK_EQ(_join_block.rows(), new_rows); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_finalize_lazy_probe_row(RuntimeState* state, + const Block& probe_block, + int64_t probe_row_pos) { + auto& p = _parent->cast(); + if ((!p._enable_lazy_probe_finalize && !p._enable_lazy_mark_finalize) || probe_row_pos < 0 || + cast_set(probe_row_pos) >= probe_block.rows()) { + return Status::OK(); + } + if (_join_block.rows() >= state->batch_size()) { + return Status::OK(); + } + + if (p._enable_lazy_mark_finalize) { + int8_t mark_value = _cur_probe_row_mark_flags[probe_row_pos]; + if (p._join_op == TJoinOp::LEFT_ANTI_JOIN && mark_value != MARK_NULL) { + mark_value = mark_value == MARK_TRUE ? MARK_FALSE : MARK_TRUE; + } + RETURN_IF_ERROR(_append_lazy_mark_probe_row_with_build_defaults(probe_block, probe_row_pos, + mark_value)); + return Status::OK(); + } + + const bool matched = _cur_probe_row_visited_flags[probe_row_pos]; + if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN) { + if (!matched) { + RETURN_IF_ERROR(_append_lazy_probe_row_with_build_defaults(probe_block, probe_row_pos)); + } + } else if (p._join_op == TJoinOp::LEFT_SEMI_JOIN) { + if (matched) { + RETURN_IF_ERROR(_append_lazy_probe_row_with_build_defaults(probe_block, probe_row_pos)); + } + } else if (p._join_op == TJoinOp::LEFT_ANTI_JOIN || + p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (!matched) { + RETURN_IF_ERROR(_append_lazy_probe_row_with_build_defaults(probe_block, probe_row_pos)); + } + } + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_finalize_lazy_build_side(RuntimeState* state) { + auto& p = _parent->cast(); + if (!p._enable_lazy_build_finalize) { + return Status::OK(); + } + + while (_join_block.rows() < state->batch_size() && + _output_null_idx_build_side < _shared_state->build_blocks.size()) { + const auto& build_block = _shared_state->build_blocks[_output_null_idx_build_side]; + const auto* __restrict visited_flags = + assert_cast( + _shared_state->build_side_visited_flags[_output_null_idx_build_side].get()) + ->get_data() + .data(); + const size_t rows = build_block.rows(); + const size_t output_capacity = state->batch_size() - _join_block.rows(); + + IColumn::Filter filter(rows, 0); + auto* __restrict filter_data = filter.data(); + size_t selected_rows = 0; + size_t row_idx = _output_null_row_idx_build_side; + for (; row_idx < rows && selected_rows < output_capacity; ++row_idx) { + const bool matched = visited_flags[row_idx] != 0; + const bool selected = p._join_op == TJoinOp::RIGHT_SEMI_JOIN ? matched : !matched; + filter_data[row_idx] = selected; + selected_rows += selected; + } + + if (selected_rows > 0) { + RETURN_IF_ERROR(_append_lazy_build_rows_with_probe_defaults(build_block, filter, + selected_rows)); + } + if (row_idx == rows) { + ++_output_null_idx_build_side; + _output_null_row_idx_build_side = 0; + } else { + _output_null_row_idx_build_side = row_idx; + } + } + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_advance_lazy_probe_row(RuntimeState* state, + const Block& probe_block) { + if (_current_build_pos == _shared_state->build_blocks.size() && + _probe_block_pos < probe_block.rows()) { + RETURN_IF_ERROR(_finalize_lazy_probe_row(state, probe_block, _probe_block_pos)); + } + + if (_probe_block_pos < probe_block.rows()) { + _probe_side_process_count++; + } + + _reset_with_next_probe_row(); + if (_probe_block_pos < probe_block.rows()) { + return Status::OK(); + } + + if (_shared_state->probe_side_eos) { + _matched_rows_done = true; + } else { + _need_more_input_data = true; + } + return Status::OK(); +} + +void NestedLoopJoinProbeLocalState::_append_lazy_probe_eval_columns( + ColumnsWithTypeAndName& eval_columns, const Block& probe_block, bool fixed_side_probe, + int64_t fixed_side_pos, size_t rows) { + auto& p = _parent->cast(); + for (size_t i = 0; i < p._num_probe_side_columns; ++i) { + const auto& block_column = _join_block.get_by_position(i); + const auto& src_column = probe_block.get_by_position(i); + if (p._lazy_eval_column_ids.find(cast_set(i)) == p._lazy_eval_column_ids.end()) { + eval_columns.emplace_back( + block_column.type->create_column_const_with_default_value(rows), + block_column.type, block_column.name); + } else if (fixed_side_probe) { + eval_columns.emplace_back( + align_eval_column_nullable( + block_column, + make_const_column_from_row(src_column, fixed_side_pos, rows)), + block_column.type, block_column.name); + } else { + eval_columns.emplace_back(align_eval_column_nullable(block_column, src_column.column), + block_column.type, block_column.name); + } + } +} + +void NestedLoopJoinProbeLocalState::_append_lazy_build_eval_columns( + ColumnsWithTypeAndName& eval_columns, const Block& build_block, bool fixed_side_probe, + int64_t fixed_side_pos, size_t rows) { + auto& p = _parent->cast(); + for (size_t i = 0; i < p._num_build_side_columns; ++i) { + const auto column_idx = p._num_probe_side_columns + i; + const auto& block_column = _join_block.get_by_position(column_idx); + const auto& src_column = build_block.get_by_position(i); + if (p._lazy_eval_column_ids.find(cast_set(column_idx)) == + p._lazy_eval_column_ids.end()) { + eval_columns.emplace_back( + block_column.type->create_column_const_with_default_value(rows), + block_column.type, block_column.name); + } else if (fixed_side_probe) { + eval_columns.emplace_back(align_eval_column_nullable(block_column, src_column.column), + block_column.type, block_column.name); + } else { + eval_columns.emplace_back( + align_eval_column_nullable( + block_column, + make_const_column_from_row(src_column, fixed_side_pos, rows)), + block_column.type, block_column.name); + } + } +} + +bool NestedLoopJoinProbeLocalState::_should_delay_lazy_probe_build_block(size_t candidate_rows, + size_t batch_size) const { + return _lazy_should_output_matched_rows() && _join_block.rows() + candidate_rows > batch_size; +} + +bool NestedLoopJoinProbeLocalState::_lazy_should_output_matched_rows() const { + auto& p = _parent->cast(); + return p._join_op == TJoinOp::INNER_JOIN || p._join_op == TJoinOp::CROSS_JOIN || + p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::FULL_OUTER_JOIN; +} + +void NestedLoopJoinProbeLocalState::_mark_lazy_build_rows_visited(size_t build_block_idx, + const IColumn::Filter& filter) { + auto& p = _parent->cast(); + if (!p._enable_lazy_build_finalize) { + return; + } + + auto& build_side_flag = assert_cast( + _shared_state->build_side_visited_flags[build_block_idx].get()) + ->get_data(); + auto* __restrict build_side_flag_data = build_side_flag.data(); + const auto* __restrict filter_data = filter.data(); + DCHECK_EQ(build_side_flag.size(), filter.size()); + for (size_t i = 0; i < filter.size(); ++i) { + build_side_flag_data[i] |= filter_data[i]; + } +} + +void NestedLoopJoinProbeLocalState::_update_lazy_mark_join_state( + const IColumn::Filter& mark_filter, const ColumnUInt8& mark_null_map, + const IColumn::Filter& other_filter) { + auto& mark_state = _cur_probe_row_mark_flags[_probe_block_pos]; + DCHECK_EQ(mark_filter.size(), mark_null_map.size()); + DCHECK_EQ(mark_filter.size(), other_filter.size()); + if (mark_state == MARK_TRUE) { + return; + } + + const auto* __restrict mark_filter_data = mark_filter.data(); + const auto* __restrict mark_null_data = mark_null_map.get_data().data(); + const auto* __restrict other_filter_data = other_filter.data(); + + for (size_t i = 0; i < mark_filter.size(); ++i) { + if (!other_filter_data[i]) { + continue; + } + if (mark_null_data[i]) { + mark_state = MARK_NULL; + } else if (mark_filter_data[i]) { + mark_state = MARK_TRUE; + return; + } + } +} + +Status NestedLoopJoinProbeLocalState::_process_lazy_probe_build_block(Block* probe_block, + const Block& build_block, + size_t build_block_idx, + bool ignore_null) { + auto& p = _parent->cast(); + // A TRUE mark is terminal for lazy MARK LEFT SEMI/ANTI joins. + if (p._enable_lazy_mark_finalize && _cur_probe_row_mark_flags[_probe_block_pos] == MARK_TRUE) { + return Status::OK(); + } + + const size_t candidate_rows = build_block.rows(); + if (candidate_rows == 0) { + return Status::OK(); + } + + ColumnsWithTypeAndName eval_columns; + eval_columns.reserve(_join_block.columns()); + _append_lazy_probe_eval_columns(eval_columns, *probe_block, true, _probe_block_pos, + candidate_rows); + _append_lazy_build_eval_columns(eval_columns, build_block, true, 0, candidate_rows); + Block eval_block(std::move(eval_columns)); + + IColumn::Filter filter(candidate_rows, 1); + bool can_filter_all = false; + { + SCOPED_TIMER(_join_conjuncts_evaluation_timer); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_join_conjuncts, nullptr, ignore_null, + &eval_block, &filter, &can_filter_all)); + } + if (can_filter_all) { + return Status::OK(); + } + + const size_t selected_rows = + candidate_rows - + simd::count_zero_num(reinterpret_cast(filter.data()), candidate_rows); + DCHECK_GT(selected_rows, 0); + + if (p._enable_lazy_mark_finalize) { + if (_mark_join_conjuncts.empty()) { + _cur_probe_row_mark_flags[_probe_block_pos] = MARK_TRUE; + } else { + IColumn::Filter mark_filter(candidate_rows, 1); + auto mark_null_map = ColumnUInt8::create(candidate_rows, 0); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_mark_join_conjuncts, &eval_block, + *mark_null_map, mark_filter)); + _update_lazy_mark_join_state(mark_filter, *mark_null_map, filter); + } + } else if (p._enable_lazy_probe_finalize) { + _cur_probe_row_visited_flags[_probe_block_pos] = true; + } + _mark_lazy_build_rows_visited(build_block_idx, filter); + if (_lazy_should_output_matched_rows()) { + RETURN_IF_ERROR(_append_lazy_rows(filter, selected_rows, true, _probe_block_pos, + *probe_block, build_block)); + } + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_generate_lazy_block_base_probe(RuntimeState* state, + Block* probe_block, + bool ignore_null) { + auto& p = _parent->cast(); + while (_join_block.rows() < state->batch_size()) { + while (_current_build_pos == _shared_state->build_blocks.size() || + _probe_block_pos == probe_block->rows()) { + RETURN_IF_ERROR(_advance_lazy_probe_row(state, *probe_block)); + if (_probe_block_pos >= probe_block->rows()) { + break; + } + } + + if (_matched_rows_done || _need_more_input_data) { + break; + } + + const size_t build_block_idx = _current_build_pos++; + const auto& build_block = _shared_state->build_blocks[build_block_idx]; + if (_should_delay_lazy_probe_build_block(build_block.rows(), state->batch_size())) { + --_current_build_pos; + break; + } + RETURN_IF_ERROR(_process_lazy_probe_build_block(probe_block, build_block, build_block_idx, + ignore_null)); + if (p._enable_lazy_mark_finalize && + _cur_probe_row_mark_flags[_probe_block_pos] == MARK_TRUE) { + _current_build_pos = _shared_state->build_blocks.size(); + } + } + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::_generate_lazy_block_base_build(RuntimeState* state, + Block* probe_block) { + DCHECK(use_generate_block_base_build()); + const auto& build_block = _shared_state->build_blocks[0]; + const size_t build_rows = build_block.rows(); + const auto probe_rows = static_cast(probe_block->rows()); + + if (probe_rows == 0) { + if (_shared_state->probe_side_eos) { + _matched_rows_done = true; + } else { + _need_more_input_data = true; + } + return Status::OK(); + } + + size_t processed_rows = 0; + while (processed_rows + probe_rows <= state->batch_size()) { + if (_probe_block_pos == probe_rows) { + _current_build_row_pos++; + _probe_block_pos = 0; + + if (_current_build_row_pos >= build_rows) { + if (_shared_state->probe_side_eos) { + _matched_rows_done = true; + } else { + _need_more_input_data = true; + _current_build_row_pos = 0; + } + break; + } + } + + if (_matched_rows_done || _need_more_input_data) { + break; + } + + processed_rows += probe_rows; + ColumnsWithTypeAndName eval_columns; + eval_columns.reserve(_join_block.columns()); + _append_lazy_probe_eval_columns(eval_columns, *probe_block, false, 0, probe_rows); + _append_lazy_build_eval_columns(eval_columns, build_block, false, _current_build_row_pos, + probe_rows); + Block eval_block(std::move(eval_columns)); + + IColumn::Filter filter(probe_rows, 1); + bool can_filter_all = false; + { + SCOPED_TIMER(_join_conjuncts_evaluation_timer); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_join_conjuncts, nullptr, false, + &eval_block, &filter, &can_filter_all)); + } + if (!can_filter_all) { + const size_t selected_rows = + probe_rows - + simd::count_zero_num(reinterpret_cast(filter.data()), probe_rows); + DCHECK_GT(selected_rows, 0); + RETURN_IF_ERROR(_append_lazy_rows(filter, selected_rows, false, _current_build_row_pos, + *probe_block, build_block)); + } + _probe_block_pos = probe_rows; + } + return Status::OK(); +} + template void NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* state, Block* probe_block) { @@ -317,6 +884,17 @@ Status NestedLoopJoinProbeLocalState::generate_inner_join_block_data(RuntimeStat auto& p = _parent->cast(); auto* probe_block = _child_block.get(); + if (p._enable_lazy_materialize) { + if (!_matched_rows_done && !_need_more_input_data) { + if (use_generate_block_base_build()) { + RETURN_IF_ERROR(_generate_lazy_block_base_build(state, probe_block)); + } else { + RETURN_IF_ERROR(_generate_lazy_block_base_probe(state, probe_block, false)); + } + } + return Status::OK(); + } + if (!_matched_rows_done && !_need_more_input_data) { if (use_generate_block_base_build()) { _generate_block_base_build(state, probe_block); @@ -348,6 +926,16 @@ Status NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat auto& p = _parent->cast(); auto* probe_block = _child_block.get(); + if (p._enable_lazy_materialize) { + if (!_matched_rows_done && !_need_more_input_data) { + RETURN_IF_ERROR(_generate_lazy_block_base_probe(state, probe_block, ignore_null)); + } + if (_matched_rows_done) { + RETURN_IF_ERROR(_finalize_lazy_build_side(state)); + } + return Status::OK(); + } + if (!_matched_rows_done && !_need_more_input_data) { // We should try to join rows if there still are some rows from probe side. // _probe_offset_stack and _build_offset_stack use u16 for storage @@ -394,6 +982,7 @@ Status NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat } template +// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing finalization handles multiple join variants. void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block, size_t batch_size) { auto& p = _parent->cast(); auto dst_columns = block.mutate_columns(); @@ -545,7 +1134,11 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_probe_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), - _old_version_flag(!tnode.__isset.nested_loop_join_node) { + _has_materialized_slot_ids(tnode.__isset.nested_loop_join_node && + tnode.nested_loop_join_node.__isset.materialized_slot_ids), + _materialized_slot_ids(_has_materialized_slot_ids + ? tnode.nested_loop_join_node.materialized_slot_ids + : std::vector {}) { _keep_origin = _is_output_probe_side_only; } @@ -562,6 +1155,12 @@ Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* VExpr::create_expr_tree(tnode.nested_loop_join_node.vjoin_conjunct, context)); _join_conjuncts.emplace_back(context); } + if (tnode.nested_loop_join_node.__isset.mark_join_conjuncts && + !tnode.nested_loop_join_node.mark_join_conjuncts.empty()) { + RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.nested_loop_join_node.mark_join_conjuncts, + _mark_join_conjuncts)); + DORIS_CHECK(_is_mark_join); + } return Status::OK(); } @@ -571,9 +1170,46 @@ Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) { for (auto& conjunct : _join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } + for (auto& conjunct : _mark_join_conjuncts) { + RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); + } _num_probe_side_columns = _child->row_desc().num_materialized_slots(); _num_build_side_columns = _build_side_child->row_desc().num_materialized_slots(); - return VExpr::open(_join_conjuncts, state); + for (const auto& conjunct : _join_conjuncts) { + conjunct->root()->collect_slot_column_ids(_lazy_eval_column_ids); + } + for (const auto& conjunct : _mark_join_conjuncts) { + conjunct->root()->collect_slot_column_ids(_lazy_eval_column_ids); + } + if (_has_materialized_slot_ids) { + for (const auto slot_id : _materialized_slot_ids) { + const int column_id = intermediate_row_desc().get_column_id(slot_id); + DORIS_CHECK(column_id >= 0); + _materialize_column_ids.insert(column_id); + } + } + _enable_lazy_mark_finalize = _is_mark_join && (_join_op == TJoinOp::LEFT_SEMI_JOIN || + _join_op == TJoinOp::LEFT_ANTI_JOIN); + if (_enable_lazy_mark_finalize) { + _materialize_column_ids.insert( + cast_set(_num_probe_side_columns + _num_build_side_columns)); + } + _enable_lazy_probe_finalize = + _join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || + _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + _join_op == TJoinOp::FULL_OUTER_JOIN; + _enable_lazy_build_finalize = + _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN || + _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN; + bool supported_lazy_join = _join_op == TJoinOp::INNER_JOIN || _join_op == TJoinOp::CROSS_JOIN || + _enable_lazy_probe_finalize || _enable_lazy_build_finalize || + _enable_lazy_mark_finalize; + _enable_lazy_materialize = _has_materialized_slot_ids && !_is_output_probe_side_only && + (!_is_mark_join || _enable_lazy_mark_finalize) && + supported_lazy_join && !projections().empty() && + &projections_row_desc() == &intermediate_row_desc(); + RETURN_IF_ERROR(VExpr::open(_join_conjuncts, state)); + return VExpr::open(_mark_join_conjuncts, state); } bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { @@ -592,6 +1228,11 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo local_state._cur_probe_row_visited_flags.resize(block->rows()); std::fill(local_state._cur_probe_row_visited_flags.begin(), local_state._cur_probe_row_visited_flags.end(), 0); + if (_enable_lazy_mark_finalize) { + local_state._cur_probe_row_mark_flags.resize(block->rows()); + std::fill(local_state._cur_probe_row_mark_flags.begin(), + local_state._cur_probe_row_mark_flags.end(), MARK_FALSE); + } local_state._probe_block_pos = 0; local_state._need_more_input_data = false; local_state._shared_state->probe_side_eos = eos; @@ -621,6 +1262,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo return Status::OK(); } +// NOLINTNEXTLINE(readability-function-cognitive-complexity): existing pull dispatch handles all NLJ variants. Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, Block* block, bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_probe_side_only) { diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.h b/be/src/exec/operator/nested_loop_join_probe_operator.h index aea6972f428bec..948e15b2d6c35a 100644 --- a/be/src/exec/operator/nested_loop_join_probe_operator.h +++ b/be/src/exec/operator/nested_loop_join_probe_operator.h @@ -17,12 +17,13 @@ #pragma once -#include - #include +#include +#include #include "common/cast_set.h" #include "common/status.h" +#include "core/column/column.h" #include "exec/operator/join_probe_operator.h" #include "exec/operator/operator.h" #include "util/simd/bits.h" @@ -77,6 +78,39 @@ class NestedLoopJoinProbeLocalState final private: // Whether to generate data based on the build side bool use_generate_block_base_build() const; + Status _advance_lazy_probe_row(RuntimeState* state, const Block& probe_block); + Status _generate_lazy_block_base_probe(RuntimeState* state, Block* probe_block, + bool ignore_null); + Status _generate_lazy_block_base_build(RuntimeState* state, Block* probe_block); + bool _should_delay_lazy_probe_build_block(size_t candidate_rows, size_t batch_size) const; + bool _lazy_should_output_matched_rows() const; + Status _process_lazy_probe_build_block(Block* probe_block, const Block& build_block, + size_t build_block_idx, bool ignore_null); + void _mark_lazy_build_rows_visited(size_t build_block_idx, const IColumn::Filter& filter); + void _update_lazy_mark_join_state(const IColumn::Filter& mark_filter, + const ColumnUInt8& mark_null_map, + const IColumn::Filter& other_filter); + Status _append_lazy_rows(const IColumn::Filter& filter, size_t selected_rows, + bool fixed_side_probe, int64_t fixed_side_pos, + const Block& probe_block, const Block& build_block); + Status _append_lazy_probe_row_with_build_defaults(const Block& probe_block, + int64_t probe_row_pos); + Status _append_lazy_mark_probe_row_with_build_defaults(const Block& probe_block, + int64_t probe_row_pos, + int8_t mark_value); + Status _finalize_lazy_probe_row(RuntimeState* state, const Block& probe_block, + int64_t probe_row_pos); + Status _append_lazy_build_rows_with_probe_defaults(const Block& build_block, + const IColumn::Filter& filter, + size_t selected_rows); + Status _finalize_lazy_build_side(RuntimeState* state); + void _replace_lazy_placeholder_columns(size_t rows); + void _append_lazy_probe_eval_columns(ColumnsWithTypeAndName& eval_columns, + const Block& probe_block, bool fixed_side_probe, + int64_t fixed_side_pos, size_t rows); + void _append_lazy_build_eval_columns(ColumnsWithTypeAndName& eval_columns, + const Block& build_block, bool fixed_side_probe, + int64_t fixed_side_pos, size_t rows); friend class NestedLoopJoinProbeOperatorX; void _update_additional_flags(Block* block); @@ -139,7 +173,7 @@ class NestedLoopJoinProbeLocalState final template Status _do_filtering_and_update_visited_flags(Block* block, bool materialize) { // The number of columns will not exceed the range of u32. - uint32_t column_to_keep = cast_set(block->columns()); + auto column_to_keep = cast_set(block->columns()); // If we need to set visited flags for build side, // 1. Execute conjuncts and get a column with bool type to do filtering. // 2. Use bool column to update build-side visited flags. @@ -205,6 +239,7 @@ class NestedLoopJoinProbeLocalState final bool _need_more_input_data = true; // Visited flags for current row in probe side. std::vector _cur_probe_row_visited_flags; + std::vector _cur_probe_row_mark_flags; size_t _current_build_pos = 0; size_t _current_build_row_pos = 0; // current row pos in build block, used by _generate_block_base_build @@ -212,7 +247,9 @@ class NestedLoopJoinProbeLocalState final std::stack _build_offset_stack; std::stack _probe_offset_stack; uint64_t _output_null_idx_build_side = 0; + uint64_t _output_null_row_idx_build_side = 0; VExprContextSPtrs _join_conjuncts; + VExprContextSPtrs _mark_join_conjuncts; RuntimeProfile::Counter* _loop_join_timer = nullptr; RuntimeProfile::Counter* _output_temp_blocks_timer = nullptr; @@ -232,7 +269,8 @@ class NestedLoopJoinProbeOperatorX final Status push(RuntimeState* state, Block* input_block, bool eos) const override; Status pull(doris::RuntimeState* state, Block* output_block, bool* eos) const override; const RowDescriptor& intermediate_row_desc() const override { - return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; + DORIS_CHECK(_intermediate_row_desc != nullptr); + return *_intermediate_row_desc; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { @@ -245,9 +283,11 @@ class NestedLoopJoinProbeOperatorX final } const RowDescriptor& row_desc() const override { - return _old_version_flag - ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); + if (_output_row_descriptor) { + return *_output_row_descriptor; + } + DORIS_CHECK(_output_row_desc != nullptr); + return *_output_row_desc; } bool need_more_input_data(RuntimeState* state) const override; @@ -256,9 +296,17 @@ class NestedLoopJoinProbeOperatorX final friend class NestedLoopJoinProbeLocalState; bool _is_output_probe_side_only; VExprContextSPtrs _join_conjuncts; + VExprContextSPtrs _mark_join_conjuncts; size_t _num_probe_side_columns = 0; size_t _num_build_side_columns = 0; - const bool _old_version_flag; + bool _has_materialized_slot_ids = false; + std::vector _materialized_slot_ids; + bool _enable_lazy_materialize = false; + bool _enable_lazy_probe_finalize = false; + bool _enable_lazy_build_finalize = false; + bool _enable_lazy_mark_finalize = false; + std::set _lazy_eval_column_ids; + std::set _materialize_column_ids; }; } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 368c9b86d0b5f8..1bf1ad26f6d806 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1994,6 +1994,8 @@ public PlanFragment visitPhysicalNestedLoopJoin( .flatMap(e -> e.getInputSlots().stream()) .map(SlotReference.class::cast) .forEach(s -> outputSlotReferenceMap.put(s.getExprId(), s)); + Map materializedSlotReferenceMap = Maps.newHashMap(outputSlotReferenceMap); + nestedLoopJoinNode.enableMaterializedSlotIds(); List outputSlotReferences = Stream.concat(leftTuples.stream(), rightTuples.stream()) .map(TupleDescriptor::getSlots) .flatMap(Collection::stream) @@ -2007,17 +2009,32 @@ public PlanFragment visitPhysicalNestedLoopJoin( for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) { SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId())); SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf); + if (materializedSlotReferenceMap.get(sf.getExprId()) != null) { + nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(sd.getId()); + } + nestedLoopJoinNode.getMaterializedSlotIdMap().put(sf.getExprId(), sd.getId()); leftIntermediateSlotDescriptor.add(sd); } for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) { SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId())); SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf); + if (materializedSlotReferenceMap.get(sf.getExprId()) != null) { + nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(sd.getId()); + } + nestedLoopJoinNode.getMaterializedSlotIdMap().put(sf.getExprId(), sd.getId()); rightIntermediateSlotDescriptor.add(sd); } if (nestedLoopJoin.getMarkJoinSlotReference().isPresent()) { - outputSlotReferences.add(nestedLoopJoin.getMarkJoinSlotReference().get()); - context.createSlotDesc(intermediateDescriptor, nestedLoopJoin.getMarkJoinSlotReference().get()); + SlotReference markJoinSlotReference = nestedLoopJoin.getMarkJoinSlotReference().get(); + outputSlotReferences.add(markJoinSlotReference); + SlotDescriptor markJoinSlotDescriptor = context.createSlotDesc(intermediateDescriptor, + markJoinSlotReference); + if (materializedSlotReferenceMap.get(markJoinSlotReference.getExprId()) != null) { + nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(markJoinSlotDescriptor.getId()); + } + nestedLoopJoinNode.getMaterializedSlotIdMap().put(markJoinSlotReference.getExprId(), + markJoinSlotDescriptor.getId()); } // set slots as nullable for outer join @@ -2053,7 +2070,7 @@ public PlanFragment visitPhysicalNestedLoopJoin( nestedLoopJoinNode.setJoinConjuncts(joinConjuncts); - if (!nestedLoopJoin.getOtherJoinConjuncts().isEmpty()) { + if (!nestedLoopJoin.getMarkJoinConjuncts().isEmpty()) { List markJoinConjuncts = nestedLoopJoin.getMarkJoinConjuncts().stream() .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); nestedLoopJoinNode.setMarkJoinConjuncts(markJoinConjuncts); @@ -2286,6 +2303,18 @@ public PlanFragment visitPhysicalProject(PhysicalProject project break; } } + } else if (joinNode instanceof NestedLoopJoinNode) { + NestedLoopJoinNode nestedLoopJoinNode = (NestedLoopJoinNode) joinNode; + nestedLoopJoinNode.getMaterializedSlotIds().clear(); + nestedLoopJoinNode.enableMaterializedSlotIds(); + Set requiredExprIds = Sets.newHashSet(); + requiredSlotIdSet.forEach(e -> requiredExprIds.add(context.findExprId(e))); + for (ExprId exprId : requiredExprIds) { + SlotId slotId = nestedLoopJoinNode.getMaterializedSlotIdMap().get(exprId); + if (slotId != null) { + nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(slotId); + } + } } return inputFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index daf65dc74d5564..ff788e4c5f1ee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -23,12 +23,20 @@ import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TNestedLoopJoinNode; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Nested loop join between left child and right child. @@ -52,6 +60,12 @@ public class NestedLoopJoinNode extends JoinNodeBase { private List markJoinConjuncts; + private final Set materializedSlotIds = Sets.newHashSet(); + + private final Map materializedSlotIdMap = Maps.newHashMap(); + + private boolean hasMaterializedSlotIds = false; + public static boolean canParallelize(JoinOperator joinOp) { return joinOp == JoinOperator.CROSS_JOIN || joinOp == JoinOperator.INNER_JOIN || joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.LEFT_SEMI_JOIN @@ -69,6 +83,29 @@ public void setMarkJoinConjuncts(List markJoinConjuncts) { this.markJoinConjuncts = markJoinConjuncts; } + public Set getMaterializedSlotIds() { + return materializedSlotIds; + } + + public Map getMaterializedSlotIdMap() { + return materializedSlotIdMap; + } + + public void enableMaterializedSlotIds() { + hasMaterializedSlotIds = true; + } + + public void addSlotIdToMaterializedSlotIds(SlotId slotId) { + materializedSlotIds.add(slotId); + hasMaterializedSlotIds = true; + } + + private List getSortedMaterializedSlotIds() { + List sortedSlotIds = new ArrayList<>(materializedSlotIds); + sortedSlotIds.sort(Comparator.comparingInt(SlotId::asInt)); + return sortedSlotIds; + } + public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, List tupleIds, JoinOperator joinOperator, boolean isMarkJoin) { super(id, "NESTED LOOP JOIN", joinOperator, isMarkJoin); @@ -103,6 +140,13 @@ protected void toThrift(TPlanNode msg) { } msg.nested_loop_join_node.setIsOutputLeftSideOnly(isOutputLeftSideOnly); msg.nested_loop_join_node.setUseSpecificProjections(false); + if (hasMaterializedSlotIds) { + List slotIds = new ArrayList<>(); + for (SlotId slotId : getSortedMaterializedSlotIds()) { + slotIds.add(slotId.asInt()); + } + msg.nested_loop_join_node.setMaterializedSlotIds(slotIds); + } msg.node_type = TPlanNodeType.CROSS_JOIN_NODE; } @@ -148,6 +192,13 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } output.append("\n"); } + if (hasMaterializedSlotIds) { + output.append(detailPrefix).append("materialized slot ids: "); + for (SlotId slotId : getSortedMaterializedSlotIds()) { + output.append(slotId).append(" "); + } + output.append("\n"); + } if (detailLevel == TExplainLevel.VERBOSE) { output.append(detailPrefix).append("isMarkJoin: ").append(isMarkJoin()).append("\n"); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 843087ff45a4c8..864ee91b1dc55a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1049,6 +1049,11 @@ struct TNestedLoopJoinNode { 9: optional list mark_join_conjuncts // deprecated 10: optional bool use_specific_projections + + // Slots that need to be materialized after join conjunct evaluation. + // If this field is not set, BE keeps the legacy behavior. + // If this field is set to an empty list, no payload slot needs materialization. + 11: optional list materialized_slot_ids } struct TMergeJoinNode { diff --git a/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy b/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy new file mode 100644 index 00000000000000..8282ee90f389f3 --- /dev/null +++ b/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_nestedloop_lazy_materialization") { + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set disable_join_reorder=true" + + sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_probe" + sql """ + CREATE TABLE test_nestedloop_lazy_materialization_probe ( + id INT NOT NULL, + v INT NOT NULL, + flag INT NOT NULL + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_build" + sql """ + CREATE TABLE test_nestedloop_lazy_materialization_build ( + id INT NOT NULL, + v INT NOT NULL, + flag INT NOT NULL + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_null_build" + sql """ + CREATE TABLE test_nestedloop_lazy_materialization_null_build ( + id INT NOT NULL, + v INT NULL + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + sql """ + INSERT INTO test_nestedloop_lazy_materialization_probe VALUES + (1, 1, 10), + (2, 3, 20), + (3, 5, 30), + (4, 7, 40), + (5, 200, 50); + """ + sql """ + INSERT INTO test_nestedloop_lazy_materialization_build VALUES + (5, 0, 1), + (10, 2, 1), + (20, 4, 0), + (30, 6, 1), + (40, 8, 0), + (50, 100, 1); + """ + sql """ + INSERT INTO test_nestedloop_lazy_materialization_null_build VALUES + (1, NULL), + (2, 2), + (3, 4); + """ + sql "sync" + + // Eval-only demand: predicate columns are real in the temporary eval block, + // but no payload column is needed after the join for count(*). + def countOnly = sql """ + SELECT COUNT(*) + FROM test_nestedloop_lazy_materialization_probe p + JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v AND p.flag + b.flag > 10; + """ + assertEquals("12", countOnly[0][0].toString()) + + // Materialize-only payload: p.id is not needed by ON predicate, but is needed + // after predicate filtering as final output. + def materializeOnly = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v AND p.flag + b.flag > 10 + ORDER BY p.id, b.id; + """ + assertEquals([[1], [1], [1], [2], [2], [2], [2], [3], [3], [3], [4], [4]], + materializeOnly) + + // Overlap demand: p.v and b.v are needed both by ON predicate and by output. + def evalAndMaterialize = sql """ + SELECT p.v, b.v + FROM test_nestedloop_lazy_materialization_probe p + JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v AND b.flag = 1 + WHERE p.v < 4 + ORDER BY p.v, b.v; + """ + assertEquals([[1, 2], [1, 6], [1, 100], [3, 6], [3, 100]], evalAndMaterialize) + + // Post-join filter demand: b.flag is not required by ON predicate here, but + // it must be materialized because WHERE is evaluated on the joined block. + def postJoinFilter = sql """ + SELECT p.id, b.id + FROM test_nestedloop_lazy_materialization_probe p + LEFT OUTER JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v + WHERE b.flag = 1 + ORDER BY p.id, b.id; + """ + assertEquals([[1, 10], [1, 30], [1, 50], [2, 30], [2, 50], [3, 30], [3, 50], [4, 50]], + postJoinFilter) + + // Nullable eval demand: RIGHT/FULL outer joins make probe-side intermediate + // slots nullable, while the source probe block column is still non-nullable. + def rightOuterNullableEval = sql """ + SELECT b.id + FROM test_nestedloop_lazy_materialization_probe p + RIGHT OUTER JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v + ORDER BY b.id, p.id; + """ + assertEquals([[5], [10], [20], [20], [30], [30], [30], [40], [40], [40], [40], + [50], [50], [50], [50]], rightOuterNullableEval) + + // Build-side and probe-side finalization should be resumable across output + // batches when lazy FULL OUTER JOIN has many unmatched rows. + def fullOuterResume = sql """ + SELECT /*+SET_VAR(batch_size=2)*/ COALESCE(p.id, -1), COALESCE(b.id, -1) + FROM test_nestedloop_lazy_materialization_probe p + FULL OUTER JOIN test_nestedloop_lazy_materialization_build b + ON p.v + 100 < b.v + ORDER BY 1, 2; + """ + assertEquals([[-1, 5], [-1, 10], [-1, 20], [-1, 30], [-1, 40], [-1, 50], + [1, -1], [2, -1], [3, -1], [4, -1], [5, -1]], fullOuterResume) + + // Probe-side matched tracking and finalization for lazy LEFT SEMI JOIN. + def leftSemi = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + LEFT SEMI JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v AND b.flag = 1 + ORDER BY p.id; + """ + assertEquals([[1], [2], [3], [4]], leftSemi) + + // Probe-side unmatched tracking and finalization for lazy LEFT ANTI JOIN. + def leftAnti = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + LEFT ANTI JOIN test_nestedloop_lazy_materialization_build b + ON p.v < b.v AND b.flag = 1 + ORDER BY p.id; + """ + assertEquals([[5]], leftAnti) + + // MARK LEFT SEMI lazy finalization: EXISTS is represented as a mark value + // because it is combined with an OR predicate. + def markLeftSemi = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + WHERE EXISTS ( + SELECT * + FROM test_nestedloop_lazy_materialization_build b + WHERE p.v < b.v AND b.flag = 1 + ) OR p.id = 5 + ORDER BY p.id; + """ + assertEquals([[1], [2], [3], [4], [5]], markLeftSemi) + + // MARK LEFT ANTI lazy finalization: NOT EXISTS inverts the mark value for + // rows without a successful predicate match. + def markLeftAnti = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + WHERE NOT EXISTS ( + SELECT * + FROM test_nestedloop_lazy_materialization_build b + WHERE p.v < b.v AND b.flag = 1 + ) OR p.id = 1 + ORDER BY p.id; + """ + assertEquals([[1], [5]], markLeftAnti) + + // MARK NULL state: IN can produce NULL when the correlated subquery contains + // NULL and no equal value, and IS NULL observes that mark state. + def markNull = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + WHERE (p.v IN ( + SELECT b.v + FROM test_nestedloop_lazy_materialization_null_build b + WHERE p.id > b.id + )) IS NULL + ORDER BY p.id; + """ + assertEquals([[2], [3], [4], [5]], markNull) + + // NULL-aware anti join lazy finalization: the NULL in the correlated subquery + // result should make NOT IN unknown for rows whose subquery sees it. + def nullAwareAnti = sql """ + SELECT p.id + FROM test_nestedloop_lazy_materialization_probe p + WHERE p.v NOT IN ( + SELECT b.v + FROM test_nestedloop_lazy_materialization_null_build b + WHERE p.id > b.id + ) + ORDER BY p.id; + """ + assertEquals([[1]], nullAwareAnti) +}