Skip to content

Commit c594f06

Browse files
Revert "Patched DF 50.3.0 (revision c)"
1 parent f680b2e commit c594f06

File tree

11 files changed

+27
-525
lines changed

11 files changed

+27
-525
lines changed

.github/workflows/rust.yml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -266,21 +266,7 @@ jobs:
266266
runs-on: ubuntu-latest
267267
container:
268268
image: amd64/rust
269-
volumes:
270-
- /usr/local:/host/usr/local
271269
steps:
272-
- name: Remove unnecessary preinstalled software
273-
run: |
274-
echo "Disk space before cleanup:"
275-
df -h
276-
# remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t)
277-
rm -rf /__t/* || true
278-
# remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup)
279-
rm -rf /host/usr/local/.ghcup || true
280-
# remove Android library: about 7.8GB (host /usr/local/lib/android)
281-
rm -rf /host/usr/local/lib/android || true
282-
echo "Disk space after cleanup:"
283-
df -h
284270
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
285271
with:
286272
submodules: true

datafusion/core/src/physical_planner.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -710,15 +710,10 @@ impl DefaultPhysicalPlanner {
710710
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
711711
}
712712
}
713-
714-
log::debug!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());
715-
716-
//influx: temporarily remove error and only log so that we can find a
717-
//reproducer in production
718-
// return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
719-
// .iter()
720-
// .map(|s| format!("\n\t- {s}"))
721-
// .join(""));
713+
return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
714+
.iter()
715+
.map(|s| format!("\n\t- {s}"))
716+
.join(""));
722717
}
723718

724719
let groups = self.create_grouping_physical_expr(

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 11 additions & 321 deletions
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 4 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818
use std::sync::Arc;
1919

2020
use crate::memory_limit::DummyStreamPartition;
21-
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
2221
use crate::physical_optimizer::test_utils::{
2322
aggregate_exec, bounded_window_exec, bounded_window_exec_with_partition,
2423
check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema,
2524
create_test_schema2, create_test_schema3, filter_exec, global_limit_exec,
2625
hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort,
27-
parquet_exec_with_stats, projection_exec, repartition_exec, schema, sort_exec,
28-
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
29-
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
30-
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
26+
projection_exec, repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr,
27+
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
28+
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
29+
union_exec, RequirementsTestExec,
3130
};
3231

3332
use arrow::compute::SortOptions;
@@ -49,9 +48,6 @@ use datafusion_physical_expr_common::sort_expr::{
4948
};
5049
use datafusion_physical_expr::{Distribution, Partitioning};
5150
use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, NotExpr};
52-
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
53-
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
54-
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5551
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5652
use datafusion_physical_plan::repartition::RepartitionExec;
5753
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -2306,93 +2302,6 @@ async fn test_commutativity() -> Result<()> {
23062302
Ok(())
23072303
}
23082304

2309-
fn single_partition_aggregate(
2310-
input: Arc<dyn ExecutionPlan>,
2311-
alias_pairs: Vec<(String, String)>,
2312-
) -> Arc<dyn ExecutionPlan> {
2313-
let schema = schema();
2314-
let group_by = alias_pairs
2315-
.iter()
2316-
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
2317-
.collect::<Vec<_>>();
2318-
let group_by = PhysicalGroupBy::new_single(group_by);
2319-
2320-
Arc::new(
2321-
AggregateExec::try_new(
2322-
AggregateMode::SinglePartitioned,
2323-
group_by,
2324-
vec![],
2325-
vec![],
2326-
input,
2327-
schema,
2328-
)
2329-
.unwrap(),
2330-
)
2331-
}
2332-
2333-
#[tokio::test]
2334-
async fn test_preserve_needed_coalesce() -> Result<()> {
2335-
// Input to EnforceSorting, from our test case.
2336-
let plan = projection_exec_with_alias(
2337-
union_exec(vec![parquet_exec_with_stats(10000); 2]),
2338-
vec![
2339-
("a".to_string(), "a".to_string()),
2340-
("b".to_string(), "value".to_string()),
2341-
],
2342-
);
2343-
let plan = Arc::new(CoalescePartitionsExec::new(plan));
2344-
let schema = schema();
2345-
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
2346-
expr: col("a", &schema).unwrap(),
2347-
options: SortOptions::default(),
2348-
}])
2349-
.unwrap();
2350-
let plan: Arc<dyn ExecutionPlan> =
2351-
single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
2352-
let plan = sort_exec(sort_key, plan);
2353-
2354-
// Starting plan: as in our test case.
2355-
assert_eq!(
2356-
get_plan_string(&plan),
2357-
vec![
2358-
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2359-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2360-
" CoalescePartitionsExec",
2361-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2362-
" UnionExec",
2363-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2364-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2365-
],
2366-
);
2367-
2368-
let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default());
2369-
assert!(checker.is_ok());
2370-
2371-
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2372-
let optimizer = EnforceSorting::new();
2373-
let optimized = optimizer.optimize(plan, &Default::default())?;
2374-
assert_eq!(
2375-
get_plan_string(&optimized),
2376-
vec![
2377-
"SortPreservingMergeExec: [a@0 ASC]",
2378-
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2379-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2380-
" CoalescePartitionsExec",
2381-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2382-
" UnionExec",
2383-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2384-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2385-
],
2386-
);
2387-
2388-
// Plan is valid.
2389-
let checker = SanityCheckPlan::new();
2390-
let checker = checker.optimize(optimized, &Default::default());
2391-
assert!(checker.is_ok());
2392-
2393-
Ok(())
2394-
}
2395-
23962305
#[tokio::test]
23972306
async fn test_coalesce_propagate() -> Result<()> {
23982307
let schema = create_test_schema()?;

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,13 +515,6 @@ pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> Result<PlanContext<
515515
.data()
516516
}
517517

518-
pub fn trim_plan_display(plan: &str) -> Vec<&str> {
519-
plan.split('\n')
520-
.map(|s| s.trim())
521-
.filter(|s| !s.is_empty())
522-
.collect()
523-
}
524-
525518
// construct a stream partition for test purposes
526519
#[derive(Debug)]
527520
pub struct TestStreamPartition {

datafusion/expr/src/udaf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync {
458458

459459
// exclude the first function argument(= column) in ordered set aggregate function,
460460
// because it is duplicated with the WITHIN GROUP clause in schema name.
461-
let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() {
461+
let args = if self.is_ordered_set_aggregate() {
462462
&args[1..]
463463
} else {
464464
&args[..]

datafusion/physical-expr/src/equivalence/properties/union.rs

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -67,43 +67,16 @@ fn calculate_union_binary(
6767
})
6868
.collect::<Vec<_>>();
6969

70-
// TEMP HACK WORKAROUND
71-
// Revert code from https://github.com/apache/datafusion/pull/12562
72-
// Context: https://github.com/apache/datafusion/issues/13748
73-
// Context: https://github.com/influxdata/influxdb_iox/issues/13038
74-
7570
// Next, calculate valid orderings for the union by searching for prefixes
7671
// in both sides.
77-
let mut orderings = vec![];
78-
for ordering in lhs.normalized_oeq_class().into_iter() {
79-
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();
80-
81-
// Progressively shorten the ordering to search for a satisfied prefix:
82-
while !rhs.ordering_satisfy(ordering.clone())? {
83-
ordering.pop();
84-
}
85-
// There is a non-trivial satisfied prefix, add it as a valid ordering:
86-
if !ordering.is_empty() {
87-
orderings.push(ordering);
88-
}
89-
}
72+
let mut orderings = UnionEquivalentOrderingBuilder::new();
73+
orderings.add_satisfied_orderings(&lhs, &rhs)?;
74+
orderings.add_satisfied_orderings(&rhs, &lhs)?;
75+
let orderings = orderings.build();
9076

91-
for ordering in rhs.normalized_oeq_class().into_iter() {
92-
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();
93-
94-
// Progressively shorten the ordering to search for a satisfied prefix:
95-
while !lhs.ordering_satisfy(ordering.clone())? {
96-
ordering.pop();
97-
}
98-
// There is a non-trivial satisfied prefix, add it as a valid ordering:
99-
if !ordering.is_empty() {
100-
orderings.push(ordering);
101-
}
102-
}
10377
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
10478
eq_properties.add_constants(constants)?;
10579
eq_properties.add_orderings(orderings);
106-
10780
Ok(eq_properties)
10881
}
10982

@@ -149,7 +122,6 @@ struct UnionEquivalentOrderingBuilder {
149122
orderings: Vec<LexOrdering>,
150123
}
151124

152-
#[expect(unused)]
153125
impl UnionEquivalentOrderingBuilder {
154126
fn new() -> Self {
155127
Self { orderings: vec![] }
@@ -532,7 +504,6 @@ mod tests {
532504
}
533505

534506
#[test]
535-
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
536507
fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> {
537508
let schema = create_test_schema().unwrap();
538509
UnionEquivalenceTest::new(&schema)
@@ -608,7 +579,6 @@ mod tests {
608579
}
609580

610581
#[test]
611-
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
612582
fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()>
613583
{
614584
let schema = create_test_schema().unwrap();
@@ -637,7 +607,6 @@ mod tests {
637607
}
638608

639609
#[test]
640-
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
641610
fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> {
642611
let schema = create_test_schema().unwrap();
643612
UnionEquivalenceTest::new(&schema)
@@ -689,7 +658,6 @@ mod tests {
689658
}
690659

691660
#[test]
692-
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
693661
fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> {
694662
let schema = create_test_schema().unwrap();
695663
UnionEquivalenceTest::new(&schema)

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ use crate::enforce_sorting::sort_pushdown::{
4848
};
4949
use crate::output_requirements::OutputRequirementExec;
5050
use crate::utils::{
51-
add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions,
52-
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
51+
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
52+
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
5353
};
5454
use crate::PhysicalOptimizerRule;
5555

@@ -678,7 +678,7 @@ fn remove_bottleneck_in_subplan(
678678
) -> Result<PlanWithCorrespondingCoalescePartitions> {
679679
let plan = &requirements.plan;
680680
let children = &mut requirements.children;
681-
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
681+
if is_coalesce_partitions(&children[0].plan) {
682682
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
683683
let mut new_child_node = children[0].children.swap_remove(0);
684684
while new_child_node.plan.output_partitioning() == plan.output_partitioning()

datafusion/physical-optimizer/src/sanity_checker.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3232
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
3333
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
3434
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
35-
use datafusion_physical_plan::sorts::sort::SortExec;
36-
use datafusion_physical_plan::union::UnionExec;
3735
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
3836

3937
use crate::PhysicalOptimizerRule;
@@ -137,14 +135,6 @@ pub fn check_plan_sanity(
137135
plan.required_input_ordering(),
138136
plan.required_input_distribution(),
139137
) {
140-
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
141-
if child.as_any().downcast_ref::<UnionExec>().is_some() {
142-
continue;
143-
}
144-
if child.as_any().downcast_ref::<SortExec>().is_some() {
145-
continue;
146-
}
147-
148138
let child_eq_props = child.equivalence_properties();
149139
if let Some(sort_req) = sort_req {
150140
let sort_req = sort_req.into_single();

datafusion/physical-optimizer/src/utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::sync::Arc;
1919

2020
use datafusion_common::Result;
2121
use datafusion_physical_expr::{LexOrdering, LexRequirement};
22-
use datafusion_physical_plan::aggregates::AggregateExec;
2322
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2423
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2524
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -114,8 +113,3 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
114113
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
115114
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
116115
}
117-
118-
/// Checks whether the given operator is a [`AggregateExec`].
119-
pub fn is_aggregation(plan: &Arc<dyn ExecutionPlan>) -> bool {
120-
plan.as_any().is::<AggregateExec>()
121-
}

0 commit comments

Comments
 (0)