Skip to content

Commit f680b2e

Browse files
Merge pull request #81 from influxdata/upgrade-df-ver5030-c
Patched DF 50.3.0 (revision c)
2 parents 7909909 + da0cdf2 commit f680b2e

File tree

11 files changed

+525
-27
lines changed

11 files changed

+525
-27
lines changed

.github/workflows/rust.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,21 @@ jobs:
266266
runs-on: ubuntu-latest
267267
container:
268268
image: amd64/rust
269+
volumes:
270+
- /usr/local:/host/usr/local
269271
steps:
272+
- name: Remove unnecessary preinstalled software
273+
run: |
274+
echo "Disk space before cleanup:"
275+
df -h
276+
# remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t)
277+
rm -rf /__t/* || true
278+
# remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup)
279+
rm -rf /host/usr/local/.ghcup || true
280+
# remove Android library: about 7.8GB (host /usr/local/lib/android)
281+
rm -rf /host/usr/local/lib/android || true
282+
echo "Disk space after cleanup:"
283+
df -h
270284
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
271285
with:
272286
submodules: true

datafusion/core/src/physical_planner.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -710,10 +710,15 @@ impl DefaultPhysicalPlanner {
710710
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
711711
}
712712
}
713-
return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
714-
.iter()
715-
.map(|s| format!("\n\t- {s}"))
716-
.join(""));
713+
714+
log::debug!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());
715+
716+
//influx: temporarily remove error and only log so that we can find a
717+
//reproducer in production
718+
// return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
719+
// .iter()
720+
// .map(|s| format!("\n\t- {s}"))
721+
// .join(""));
717722
}
718723

719724
let groups = self.create_grouping_physical_expr(

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 321 additions & 11 deletions
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818
use std::sync::Arc;
1919

2020
use crate::memory_limit::DummyStreamPartition;
21+
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
2122
use crate::physical_optimizer::test_utils::{
2223
aggregate_exec, bounded_window_exec, bounded_window_exec_with_partition,
2324
check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema,
2425
create_test_schema2, create_test_schema3, filter_exec, global_limit_exec,
2526
hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort,
26-
projection_exec, repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr,
27-
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
28-
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
29-
union_exec, RequirementsTestExec,
27+
parquet_exec_with_stats, projection_exec, repartition_exec, schema, sort_exec,
28+
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
29+
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
30+
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
3031
};
3132

3233
use arrow::compute::SortOptions;
@@ -48,6 +49,9 @@ use datafusion_physical_expr_common::sort_expr::{
4849
};
4950
use datafusion_physical_expr::{Distribution, Partitioning};
5051
use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, NotExpr};
52+
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
53+
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
54+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5155
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5256
use datafusion_physical_plan::repartition::RepartitionExec;
5357
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -2302,6 +2306,93 @@ async fn test_commutativity() -> Result<()> {
23022306
Ok(())
23032307
}
23042308

2309+
fn single_partition_aggregate(
2310+
input: Arc<dyn ExecutionPlan>,
2311+
alias_pairs: Vec<(String, String)>,
2312+
) -> Arc<dyn ExecutionPlan> {
2313+
let schema = schema();
2314+
let group_by = alias_pairs
2315+
.iter()
2316+
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
2317+
.collect::<Vec<_>>();
2318+
let group_by = PhysicalGroupBy::new_single(group_by);
2319+
2320+
Arc::new(
2321+
AggregateExec::try_new(
2322+
AggregateMode::SinglePartitioned,
2323+
group_by,
2324+
vec![],
2325+
vec![],
2326+
input,
2327+
schema,
2328+
)
2329+
.unwrap(),
2330+
)
2331+
}
2332+
2333+
#[tokio::test]
2334+
async fn test_preserve_needed_coalesce() -> Result<()> {
2335+
// Input to EnforceSorting, from our test case.
2336+
let plan = projection_exec_with_alias(
2337+
union_exec(vec![parquet_exec_with_stats(10000); 2]),
2338+
vec![
2339+
("a".to_string(), "a".to_string()),
2340+
("b".to_string(), "value".to_string()),
2341+
],
2342+
);
2343+
let plan = Arc::new(CoalescePartitionsExec::new(plan));
2344+
let schema = schema();
2345+
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
2346+
expr: col("a", &schema).unwrap(),
2347+
options: SortOptions::default(),
2348+
}])
2349+
.unwrap();
2350+
let plan: Arc<dyn ExecutionPlan> =
2351+
single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
2352+
let plan = sort_exec(sort_key, plan);
2353+
2354+
// Starting plan: as in our test case.
2355+
assert_eq!(
2356+
get_plan_string(&plan),
2357+
vec![
2358+
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2359+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2360+
" CoalescePartitionsExec",
2361+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2362+
" UnionExec",
2363+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2364+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2365+
],
2366+
);
2367+
2368+
let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default());
2369+
assert!(checker.is_ok());
2370+
2371+
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2372+
let optimizer = EnforceSorting::new();
2373+
let optimized = optimizer.optimize(plan, &Default::default())?;
2374+
assert_eq!(
2375+
get_plan_string(&optimized),
2376+
vec![
2377+
"SortPreservingMergeExec: [a@0 ASC]",
2378+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2379+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2380+
" CoalescePartitionsExec",
2381+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2382+
" UnionExec",
2383+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2384+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2385+
],
2386+
);
2387+
2388+
// Plan is valid.
2389+
let checker = SanityCheckPlan::new();
2390+
let checker = checker.optimize(optimized, &Default::default());
2391+
assert!(checker.is_ok());
2392+
2393+
Ok(())
2394+
}
2395+
23052396
#[tokio::test]
23062397
async fn test_coalesce_propagate() -> Result<()> {
23072398
let schema = create_test_schema()?;

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,13 @@ pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> Result<PlanContext<
515515
.data()
516516
}
517517

518+
pub fn trim_plan_display(plan: &str) -> Vec<&str> {
519+
plan.split('\n')
520+
.map(|s| s.trim())
521+
.filter(|s| !s.is_empty())
522+
.collect()
523+
}
524+
518525
// construct a stream partition for test purposes
519526
#[derive(Debug)]
520527
pub struct TestStreamPartition {

datafusion/expr/src/udaf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync {
458458

459459
// exclude the first function argument(= column) in ordered set aggregate function,
460460
// because it is duplicated with the WITHIN GROUP clause in schema name.
461-
let args = if self.is_ordered_set_aggregate() {
461+
let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() {
462462
&args[1..]
463463
} else {
464464
&args[..]

datafusion/physical-expr/src/equivalence/properties/union.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,43 @@ fn calculate_union_binary(
6767
})
6868
.collect::<Vec<_>>();
6969

70+
// TEMP HACK WORKAROUND
71+
// Revert code from https://github.com/apache/datafusion/pull/12562
72+
// Context: https://github.com/apache/datafusion/issues/13748
73+
// Context: https://github.com/influxdata/influxdb_iox/issues/13038
74+
7075
// Next, calculate valid orderings for the union by searching for prefixes
7176
// in both sides.
72-
let mut orderings = UnionEquivalentOrderingBuilder::new();
73-
orderings.add_satisfied_orderings(&lhs, &rhs)?;
74-
orderings.add_satisfied_orderings(&rhs, &lhs)?;
75-
let orderings = orderings.build();
77+
let mut orderings = vec![];
78+
for ordering in lhs.normalized_oeq_class().into_iter() {
79+
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();
80+
81+
// Progressively shorten the ordering to search for a satisfied prefix:
82+
while !rhs.ordering_satisfy(ordering.clone())? {
83+
ordering.pop();
84+
}
85+
// There is a non-trivial satisfied prefix, add it as a valid ordering:
86+
if !ordering.is_empty() {
87+
orderings.push(ordering);
88+
}
89+
}
7690

91+
for ordering in rhs.normalized_oeq_class().into_iter() {
92+
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();
93+
94+
// Progressively shorten the ordering to search for a satisfied prefix:
95+
while !lhs.ordering_satisfy(ordering.clone())? {
96+
ordering.pop();
97+
}
98+
// There is a non-trivial satisfied prefix, add it as a valid ordering:
99+
if !ordering.is_empty() {
100+
orderings.push(ordering);
101+
}
102+
}
77103
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
78104
eq_properties.add_constants(constants)?;
79105
eq_properties.add_orderings(orderings);
106+
80107
Ok(eq_properties)
81108
}
82109

@@ -122,6 +149,7 @@ struct UnionEquivalentOrderingBuilder {
122149
orderings: Vec<LexOrdering>,
123150
}
124151

152+
#[expect(unused)]
125153
impl UnionEquivalentOrderingBuilder {
126154
fn new() -> Self {
127155
Self { orderings: vec![] }
@@ -504,6 +532,7 @@ mod tests {
504532
}
505533

506534
#[test]
535+
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
507536
fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> {
508537
let schema = create_test_schema().unwrap();
509538
UnionEquivalenceTest::new(&schema)
@@ -579,6 +608,7 @@ mod tests {
579608
}
580609

581610
#[test]
611+
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
582612
fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()>
583613
{
584614
let schema = create_test_schema().unwrap();
@@ -607,6 +637,7 @@ mod tests {
607637
}
608638

609639
#[test]
640+
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
610641
fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> {
611642
let schema = create_test_schema().unwrap();
612643
UnionEquivalenceTest::new(&schema)
@@ -658,6 +689,7 @@ mod tests {
658689
}
659690

660691
#[test]
692+
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
661693
fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> {
662694
let schema = create_test_schema().unwrap();
663695
UnionEquivalenceTest::new(&schema)

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ use crate::enforce_sorting::sort_pushdown::{
4848
};
4949
use crate::output_requirements::OutputRequirementExec;
5050
use crate::utils::{
51-
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
52-
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
51+
add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions,
52+
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
5353
};
5454
use crate::PhysicalOptimizerRule;
5555

@@ -678,7 +678,7 @@ fn remove_bottleneck_in_subplan(
678678
) -> Result<PlanWithCorrespondingCoalescePartitions> {
679679
let plan = &requirements.plan;
680680
let children = &mut requirements.children;
681-
if is_coalesce_partitions(&children[0].plan) {
681+
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
682682
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
683683
let mut new_child_node = children[0].children.swap_remove(0);
684684
while new_child_node.plan.output_partitioning() == plan.output_partitioning()

datafusion/physical-optimizer/src/sanity_checker.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3232
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
3333
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
3434
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35+
use datafusion_physical_plan::sorts::sort::SortExec;
36+
use datafusion_physical_plan::union::UnionExec;
3537
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
3638

3739
use crate::PhysicalOptimizerRule;
@@ -135,6 +137,14 @@ pub fn check_plan_sanity(
135137
plan.required_input_ordering(),
136138
plan.required_input_distribution(),
137139
) {
140+
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
141+
if child.as_any().downcast_ref::<UnionExec>().is_some() {
142+
continue;
143+
}
144+
if child.as_any().downcast_ref::<SortExec>().is_some() {
145+
continue;
146+
}
147+
138148
let child_eq_props = child.equivalence_properties();
139149
if let Some(sort_req) = sort_req {
140150
let sort_req = sort_req.into_single();

datafusion/physical-optimizer/src/utils.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion_common::Result;
2121
use datafusion_physical_expr::{LexOrdering, LexRequirement};
22+
use datafusion_physical_plan::aggregates::AggregateExec;
2223
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2324
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2425
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -113,3 +114,8 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
113114
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
114115
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
115116
}
117+
118+
/// Checks whether the given operator is a [`AggregateExec`].
119+
pub fn is_aggregation(plan: &Arc<dyn ExecutionPlan>) -> bool {
120+
plan.as_any().is::<AggregateExec>()
121+
}

0 commit comments

Comments
 (0)