Skip to content

stream: add gather operator and tighten coverage#2824

Draft
He-Pin wants to merge 2 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage
Draft

stream: add gather operator and tighten coverage#2824
He-Pin wants to merge 2 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage

Conversation

@He-Pin
Copy link
Copy Markdown
Member

@He-Pin He-Pin commented Mar 29, 2026

Motivation

Add the gather operator across the Scala and Java DSLs, tighten its execution semantics, and broaden its statefulMap-equivalent regression coverage.

This PR also includes the hot-path and backpressure fixes found during review, plus benchmark and documentation support for the new operator.

Modification

  • add gather API support in the Scala and Java DSLs
  • implement the Gather stage and supporting Gatherer / GatherCollector APIs
  • add Scala, Java, and docs examples for the operator
  • add FlowGatherSpec plus Java parity coverage
  • add JMH coverage for zipWithIndex-style workloads
  • optimize the public single-output gather hot path
  • fix the OneToOneGatherer backpressure issue found during review
  • expand tests toward statefulMap-equivalent scenarios such as happy-path state retention, restart, stop, and backpressure
  • correct the new gather API @since markers to 2.0.0
  • document which statefulMap scenarios map directly to gather and which null-state cases do not translate one-to-one

Result

  • gather is documented and available end-to-end in Scala and Java DSLs
  • correctness coverage is materially stronger, especially around supervision and backpressure
  • the PR now documents the intended release version (2.0.0) for the new APIs
  • local verification passed:
    • sbt --no-colors 'scalafmtAll' 'stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowGatherSpec' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.FlowTest' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.SourceTest' 'docs/test:compile' 'bench-jmh/Jmh/compile'
  • corrected local JMH indicates gather-based zipWithIndex is now near parity with statefulMap

References

  • stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
  • stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
  • stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
  • stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
  • docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md

He-Pin and others added 2 commits March 29, 2026 23:46
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage.

Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review.

Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap.

Co-authored-by: Copilot <[email protected]>
Motivation: follow-up review and documentation work for the new gather operator.

Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling.

Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly.

Co-authored-by: Copilot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant