Skip to content

[SPARK-56664][SS][RTM][StreamingShuffle][Part3] Add StreamingShuffleManager and MultiShuffleManager#56196

Open
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:stack/streaming-shuffle-pr3-managers
Open

[SPARK-56664][SS][RTM][StreamingShuffle][Part3] Add StreamingShuffleManager and MultiShuffleManager#56196
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:stack/streaming-shuffle-pr3-managers

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This is part 3 of a multi-PR effort to add streaming shuffle to Spark — a push-based shuffle used by Real-Time Mode (RTM) structured streaming, where writer tasks push records
directly to reader tasks over the network instead of writing map output to disk for readers to pull.

This PR adds the shuffle-manager layer that later PRs plug into:

  • StreamingShuffleManager — a ShuffleManager implementation for streaming shuffle. getWriter/getReader are intentionally stubbed in this PR (they throw
    UnsupportedOperationException) and are implemented in the push-path / pull-path PRs that follow.
  • MultiShuffleManager — routes each shuffle to either the batch SortShuffleManager or the StreamingShuffleManager, based on a per-query local property, so a single application
    can mix batch and streaming shuffle.
  • TaskContextAwareLogging — a Logging mixin that prefixes log lines with queryId / shuffleId / stageId / taskId.
  • SparkEnv — exposes the StreamingShuffleOutputTracker (added in part 2) to executors, and initializes it only when the configured shuffle manager is StreamingShuffleManager
    or MultiShuffleManager.
  • Two streaming-shuffle error conditions (STREAMING_SHUFFLE_INCORRECT_SEQUENCE_NUMBER, STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE) and the STREAMING_QUERY_ID log key.

The full PR stack:

  • Part 1 (SPARK-56674, merged) — streaming shuffle wire protocol (Netty messages).
  • Part 2 (SPARK-56962, merged) — StreamingShuffleOutputTracker (driver-side writer-location coordination).
  • Part 3 (this PR) — shuffle-manager layer (StreamingShuffleManager + MultiShuffleManager), logging mixin, and SparkEnv tracker wiring.
  • Part 4StreamingShuffleWriter + server-side Netty handler (push path).
  • Part 5StreamingShuffleReader + client-side Netty handler (pull path).
  • Part 6 — register streaming shuffles with the tracker in DAGScheduler (activation).
  • Part 7 — end-to-end StreamingShuffleSuite.
  • Part 8 — documentation.

Why are the changes needed?

Real-Time Mode / low-latency continuous queries need shuffle data to flow continuously between stages. The default sort shuffle (write map output to disk, then have reducers pull it) adds
latency that is unacceptable for these workloads. Streaming shuffle instead pushes records directly from writer tasks to reader tasks.

This PR lands the manager layer that the writer and reader implementations attach to, plus MultiShuffleManager so batch stages keep using the sort shuffle while streaming stages use the
streaming shuffle within the same application.

Does this PR introduce any user-facing change?

No. The new shuffle managers are opt-in via spark.shuffle.manager and are not the default; getWriter/getReader are still stubbed in this PR, so the feature is not yet usable
end-to-end (completed in later PRs). The StreamingShuffleOutputTracker is initialized only when one of the new managers is configured, so there is no change to the default (sort
shuffle) path — this is covered by tests.

How was this patch tested?

New unit suites:

  • StreamingShuffleManagerSuitegetWriterId for data/termination messages and the unexpected-message-type error; getQueryId resolution and failure; registerShuffle handle
    type; and SparkEnv gating (tracker is present for StreamingShuffleManager, absent for the default manager).
  • MultiShuffleManagerSuite — per-query streaming-vs-batch routing, the enable property, and SparkEnv gating for MultiShuffleManager.

13 tests, all passing. SparkThrowableSuite validates the two new error conditions.

Was this patch authored or co-authored using generative AI tooling?

Co-authored with Claude Code (Claude Opus 4.8)

…er, and logging mixin

Introduces the streaming shuffle manager layer:
* StreamingShuffleManager - ShuffleManager implementation for streaming
  shuffle. getWriter/getReader are stubbed here and implemented in the
  follow-up push-path and pull-path PRs.
* MultiShuffleManager - routes each shuffle to either the batch
  (SortShuffleManager) or streaming manager.
* TaskContextAwareLogging - logging mixin that prefixes queryId /
  shuffleId / stageId / taskId.
* SparkEnv - expose the (already-merged) StreamingShuffleOutputTracker to
  executors when the configured shuffle manager is the streaming or multi
  manager.
* Streaming shuffle error conditions and the STREAMING_QUERY_ID log key.
* StreamingShuffleManagerSuite and MultiShuffleManagerSuite covering the
  manager APIs, routing, and the SparkEnv tracker-initialization gating.

Co-authored-by: Isaac
@jerrypeng jerrypeng force-pushed the stack/streaming-shuffle-pr3-managers branch from 4fdfa00 to e7fb66b Compare May 29, 2026 07:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant