Skip to content

[Data] Disable resource throttling for UnionOperator #60627

@bveeramani

Description

@bveeramani

Description

The UnionOperator should have throttling disabled since it doesn't launch any tasks. Currently, it inherits the default throttling_disabled() behavior (returns False), which means it gets allocated resources unnecessarily. The fix is to override throttling_disabled() to return True and add a unit test to verify this behavior.

Background

Ray Data's execution engine uses a resource management system that allocates memory and CPU resources to operators. Operators can opt out of this resource allocation by overriding the throttling_disabled() method to return True.

The throttling_disabled() method is defined in the base PhysicalOperator class at python/ray/data/_internal/execution/interfaces/physical_operator.py:652-659:

def throttling_disabled(self) -> bool:
    """Whether to disable resource throttling for this operator.

    This should return True for operators that only manipulate bundle metadata
    (e.g., the OutputSplitter operator). This hints to the execution engine that
    these operators should not be throttled based on resource usage.
    """
    return False  # Default: throttling is ENABLED

When throttling_disabled() returns True:

  • The operator is skipped in resource reservation calculations (ResourceManager.is_op_eligible())
  • The operator gets higher priority in the ranker (DefaultRanker.rank_operator())

Several operators already disable throttling because they don't launch tasks:

  • LimitOperator - python/ray/data/_internal/execution/operators/limit_operator.py:132
  • OutputSplitter - python/ray/data/_internal/execution/operators/output_splitter.py:111-118
  • AggregateNumRows - python/ray/data/_internal/execution/operators/aggregate_num_rows.py:60-61

Motivation

The UnionOperator combines output blocks from multiple input operators into a single output stream. It operates purely on bundle metadata - it doesn't process data or launch any Ray tasks. It simply routes RefBundle objects from input buffers to an output buffer.

Because Union doesn't consume computational resources, allocating resources to it is wasteful and can negatively impact scheduling of operators that actually need resources.

Implementation Boundaries & Constraints

  • The implementation should follow the existing pattern used by LimitOperator and similar operators
  • The UnionOperator class is located at python/ray/data/_internal/execution/operators/union_operator.py
  • Add a unit test to verify that throttling_disabled() returns True. The test should create a minimal physical DAG with a input buffer and union and assert that the union doesn't get allocated any resources.

Metadata

Metadata

Assignees

Labels

P2Important issue, but not time-criticaldataRay Data-related issues

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions