@@ -651,7 +651,7 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
651651 int node_idx = 0 ;
652652
653653 RETURN_IF_ERROR (_create_tree_helper (pool, _params.fragment .plan .nodes , descs, nullptr ,
654- &node_idx, root, cur_pipe, 0 , false ));
654+ &node_idx, root, cur_pipe, 0 , false , false ));
655655
656656 if (node_idx + 1 != _params.fragment .plan .nodes .size ()) {
657657 return Status::InternalError (
@@ -660,12 +660,10 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
660660 return Status::OK ();
661661}
662662
663- Status PipelineFragmentContext::_create_tree_helper (ObjectPool* pool,
664- const std::vector<TPlanNode>& tnodes,
665- const DescriptorTbl& descs, OperatorPtr parent,
666- int * node_idx, OperatorPtr* root,
667- PipelinePtr& cur_pipe, int child_idx,
668- const bool followed_by_shuffled_operator) {
663+ Status PipelineFragmentContext::_create_tree_helper (
664+ ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
665+ OperatorPtr parent, int * node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
666+ const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
669667 // propagate error case
670668 if (*node_idx >= tnodes.size ()) {
671669 return Status::InternalError (
@@ -676,10 +674,12 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
676674
677675 int num_children = tnodes[*node_idx].num_children ;
678676 bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
677+ bool current_require_bucket_distribution = require_bucket_distribution;
679678 OperatorPtr op = nullptr ;
680679 RETURN_IF_ERROR (_create_operator (pool, tnodes[*node_idx], descs, op, cur_pipe,
681680 parent == nullptr ? -1 : parent->node_id (), child_idx,
682- followed_by_shuffled_operator));
681+ followed_by_shuffled_operator,
682+ current_require_bucket_distribution));
683683 // Initialization must be done here. For example, group by expressions in agg will be used to
684684 // decide if a local shuffle should be planed, so it must be initialized here.
685685 RETURN_IF_ERROR (op->init (tnode, _runtime_state.get ()));
@@ -711,14 +711,21 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
711711 : op->is_shuffled_operator ())) &&
712712 Pipeline::is_hash_exchange (required_data_distribution.distribution_type );
713713
714+ current_require_bucket_distribution =
715+ (require_bucket_distribution ||
716+ (cur_pipe->operators ().empty () ? cur_pipe->sink ()->is_colocated_operator ()
717+ : op->is_colocated_operator ())) &&
718+ Pipeline::is_hash_exchange (required_data_distribution.distribution_type );
719+
714720 if (num_children == 0 ) {
715721 _use_serial_source = op->is_serial_operator ();
716722 }
717723 // rely on that tnodes is preorder of the plan
718724 for (int i = 0 ; i < num_children; i++) {
719725 ++*node_idx;
720726 RETURN_IF_ERROR (_create_tree_helper (pool, tnodes, descs, op, node_idx, nullptr , cur_pipe, i,
721- current_followed_by_shuffled_operator));
727+ current_followed_by_shuffled_operator,
728+ current_require_bucket_distribution));
722729
723730 // we are expecting a child, but have used all nodes
724731 // this means we have been given a bad tree and must fail
@@ -1213,18 +1220,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
12131220 const DescriptorTbl& descs, OperatorPtr& op,
12141221 PipelinePtr& cur_pipe, int parent_idx,
12151222 int child_idx,
1216- const bool followed_by_shuffled_operator) {
1223+ const bool followed_by_shuffled_operator,
1224+ const bool require_bucket_distribution) {
12171225 std::vector<DataSinkOperatorPtr> sink_ops;
12181226 Defer defer = Defer ([&]() {
12191227 if (op) {
1220- op->update_operator (tnode, followed_by_shuffled_operator, _require_bucket_distribution);
1221- _require_bucket_distribution =
1222- _require_bucket_distribution || op->is_colocated_operator ();
1228+ op->update_operator (tnode, followed_by_shuffled_operator, require_bucket_distribution);
12231229 }
12241230 for (auto & s : sink_ops) {
1225- s->update_operator (tnode, followed_by_shuffled_operator, _require_bucket_distribution);
1226- _require_bucket_distribution =
1227- _require_bucket_distribution || s->is_colocated_operator ();
1231+ s->update_operator (tnode, followed_by_shuffled_operator, require_bucket_distribution);
12281232 }
12291233 });
12301234 // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
@@ -1599,15 +1603,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
15991603 break ;
16001604 }
16011605 case TPlanNodeType::INTERSECT_NODE: {
1602- RETURN_IF_ERROR (_build_operators_for_set_operation_node<true >(
1603- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
1604- !tnode.intersect_node .is_colocate || followed_by_shuffled_operator, sink_ops));
1606+ RETURN_IF_ERROR (_build_operators_for_set_operation_node<true >(pool, tnode, descs, op,
1607+ cur_pipe, sink_ops));
16051608 break ;
16061609 }
16071610 case TPlanNodeType::EXCEPT_NODE: {
1608- RETURN_IF_ERROR (_build_operators_for_set_operation_node<false >(
1609- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
1610- !tnode.except_node .is_colocate || followed_by_shuffled_operator, sink_ops));
1611+ RETURN_IF_ERROR (_build_operators_for_set_operation_node<false >(pool, tnode, descs, op,
1612+ cur_pipe, sink_ops));
16111613 break ;
16121614 }
16131615 case TPlanNodeType::REPEAT_NODE: {
@@ -1704,8 +1706,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
17041706template <bool is_intersect>
17051707Status PipelineFragmentContext::_build_operators_for_set_operation_node (
17061708 ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1707- PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool followed_by_shuffled_operator,
1708- std::vector<DataSinkOperatorPtr>& sink_ops) {
1709+ PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
17091710 op.reset (new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id (), descs));
17101711 RETURN_IF_ERROR (cur_pipe->add_operator (op, _parallel_instances));
17111712
@@ -2104,8 +2105,6 @@ Status PipelineFragmentContext::set_to_rerun() {
21042105Status PipelineFragmentContext::rebuild (ThreadPool* thread_pool) {
21052106 _submitted = false ;
21062107 _is_fragment_instance_closed = false ;
2107- // _require_bucket_distribution may be set to true to affect the building of pipeline with local shuffle
2108- _require_bucket_distribution = false ;
21092108 return _build_and_prepare_full_pipeline (thread_pool);
21102109}
21112110
0 commit comments