[SPARK-56664][SS][RTM][StreamingShuffle][Part3] Add StreamingShuffleManager and MultiShuffleManager#56196
Open
jerrypeng wants to merge 1 commit into
Open
Conversation
…er, and logging mixin Introduces the streaming shuffle manager layer: * StreamingShuffleManager - ShuffleManager implementation for streaming shuffle. getWriter/getReader are stubbed here and implemented in the follow-up push-path and pull-path PRs. * MultiShuffleManager - routes each shuffle to either the batch (SortShuffleManager) or streaming manager. * TaskContextAwareLogging - logging mixin that prefixes queryId / shuffleId / stageId / taskId. * SparkEnv - expose the (already-merged) StreamingShuffleOutputTracker to executors when the configured shuffle manager is the streaming or multi manager. * Streaming shuffle error conditions and the STREAMING_QUERY_ID log key. * StreamingShuffleManagerSuite and MultiShuffleManagerSuite covering the manager APIs, routing, and the SparkEnv tracker-initialization gating. Co-authored-by: Isaac
4fdfa00 to
e7fb66b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This is part 3 of a multi-PR effort to add streaming shuffle to Spark — a push-based shuffle used by Real-Time Mode (RTM) structured streaming, where writer tasks push records
directly to reader tasks over the network instead of writing map output to disk for readers to pull.
This PR adds the shuffle-manager layer that later PRs plug into:
StreamingShuffleManager— aShuffleManagerimplementation for streaming shuffle.getWriter/getReaderare intentionally stubbed in this PR (they throwUnsupportedOperationException) and are implemented in the push-path / pull-path PRs that follow.MultiShuffleManager— routes each shuffle to either the batchSortShuffleManageror theStreamingShuffleManager, based on a per-query local property, so a single applicationcan mix batch and streaming shuffle.
TaskContextAwareLogging— aLoggingmixin that prefixes log lines with queryId / shuffleId / stageId / taskId.SparkEnv— exposes theStreamingShuffleOutputTracker(added in part 2) to executors, and initializes it only when the configured shuffle manager isStreamingShuffleManageror
MultiShuffleManager.STREAMING_SHUFFLE_INCORRECT_SEQUENCE_NUMBER,STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE) and theSTREAMING_QUERY_IDlog key.The full PR stack:
StreamingShuffleOutputTracker(driver-side writer-location coordination).StreamingShuffleManager+MultiShuffleManager), logging mixin, and SparkEnv tracker wiring.StreamingShuffleWriter+ server-side Netty handler (push path).StreamingShuffleReader+ client-side Netty handler (pull path).DAGScheduler(activation).StreamingShuffleSuite.Why are the changes needed?
Real-Time Mode / low-latency continuous queries need shuffle data to flow continuously between stages. The default sort shuffle (write map output to disk, then have reducers pull it) adds
latency that is unacceptable for these workloads. Streaming shuffle instead pushes records directly from writer tasks to reader tasks.
This PR lands the manager layer that the writer and reader implementations attach to, plus
MultiShuffleManagerso batch stages keep using the sort shuffle while streaming stages use thestreaming shuffle within the same application.
Does this PR introduce any user-facing change?
No. The new shuffle managers are opt-in via
spark.shuffle.managerand are not the default;getWriter/getReaderare still stubbed in this PR, so the feature is not yet usableend-to-end (completed in later PRs). The
StreamingShuffleOutputTrackeris initialized only when one of the new managers is configured, so there is no change to the default (sortshuffle) path — this is covered by tests.
How was this patch tested?
New unit suites:
StreamingShuffleManagerSuite—getWriterIdfor data/termination messages and the unexpected-message-type error;getQueryIdresolution and failure;registerShufflehandletype; and SparkEnv gating (tracker is present for
StreamingShuffleManager, absent for the default manager).MultiShuffleManagerSuite— per-query streaming-vs-batch routing, the enable property, and SparkEnv gating forMultiShuffleManager.13 tests, all passing.
SparkThrowableSuitevalidates the two new error conditions.Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Code (Claude Opus 4.8)