[SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch#56208
[SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch#56208AnishMahto wants to merge 3 commits into
Conversation
e22e3d8 to
67c76a8
Compare
| /** | ||
| * Concept: run of upsert events. | ||
| * | ||
| * A run is a maximal sequence of consecutive upsert events (in sorted order by sequencing) |
There was a problem hiding this comment.
Just a heads up; I explain a bunch of concepts in this scaladoc so readers have context on the startAt, endAt, and recordStartAt columns I introduce below, but none of these concepts are actually actively used in this PR.
jose-torres
left a comment
There was a problem hiding this comment.
In general, it's a bit hard to understand the concrete abstraction that this PR implements: it does suchandsuch list of transformations, and does it correctly as far as I can tell, but how do we know it's the right list? Since this is all inside a new component, I'm OK with proceeding as is (after handling the duplicates question), but the structure may mean we have to go back and revisit parts of this in future PRs.
| * | ||
| * Step ordering is load-bearing: the row-extension steps reference user data columns that | ||
| * target-column selection is allowed to drop, so selection runs last. Unlike SCD1, no per-key | ||
| * deduplication step is needed - SCD2 preserves every event as part of the row's history. |
There was a problem hiding this comment.
Does it also preserve full-event duplicates (which would eventually map to a START == END row)?
| Row(1, null, 20L, 20L, Row(20L)) | ||
| ) | ||
| ) | ||
| } |
There was a problem hiding this comment.
Since the SCD1 equivalent preprocessing drops duplicates, I think we should have a case explicitly confirming that duplicates are not dropped here. (Or perhaps confirming that full row duplicates are dropped, if that's the intended behavior.)
What changes were proposed in this pull request?
Preamble:
The SCD type 2 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD2 replication semantics.
SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving/out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.
Preprocess Microbatch
For SCD2, preprocessing the microbatch is all about getting it in the right shape, aligned with the shape of the target table the microbatch will be merged into + the shape that SCD2 itself as a standard demands.
That is:
The microbatch must have a start-at and end-at columns projected as per the SCD2 standard, to indicate that a historical/alive record was active between those sequence stamps
The microbatch will have the operational CDC metadata column projected, which is needed to reconcile late arriving events/bookkeeping
As per the Spark AutoCDC API, the microbatch should project down to just the user-specified column selection
Implement the part of the core SCD2 microbatch processor that does this microbatch preprocessing.
Why are the changes needed?
To support AutoCDC SCD2 transformations, as per the approved SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
Does this PR introduce any user-facing change?
No. New feature.
How was this patch tested?
Scd2BatchProcessorSuiteWas this patch authored or co-authored using generative AI tooling?
Co-written with Claude Opus 4.7