Skip to content

[SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch#56208

Open
AnishMahto wants to merge 3 commits into
apache:masterfrom
AnishMahto:SPARK-57134-SCD2-preprocess-microbatch
Open

[SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch#56208
AnishMahto wants to merge 3 commits into
apache:masterfrom
AnishMahto:SPARK-57134-SCD2-preprocess-microbatch

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 29, 2026

What changes were proposed in this pull request?

Preamble:

The SCD type 2 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD2 replication semantics.

SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving/out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.

Preprocess Microbatch

For SCD2, preprocessing the microbatch is all about getting it in the right shape, aligned with the shape of the target table the microbatch will be merged into + the shape that SCD2 itself as a standard demands.

That is:

The microbatch must have a start-at and end-at columns projected as per the SCD2 standard, to indicate that a historical/alive record was active between those sequence stamps
The microbatch will have the operational CDC metadata column projected, which is needed to reconcile late arriving events/bookkeeping
As per the Spark AutoCDC API, the microbatch should project down to just the user-specified column selection
Implement the part of the core SCD2 microbatch processor that does this microbatch preprocessing.

Why are the changes needed?

To support AutoCDC SCD2 transformations, as per the approved SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7

Does this PR introduce any user-facing change?

No. New feature.

How was this patch tested?

Scd2BatchProcessorSuite

Was this patch authored or co-authored using generative AI tooling?

Co-written with Claude Opus 4.7

@AnishMahto AnishMahto force-pushed the SPARK-57134-SCD2-preprocess-microbatch branch from e22e3d8 to 67c76a8 Compare May 29, 2026 16:49
@AnishMahto
Copy link
Copy Markdown
Contributor Author

@jose-torres

Comment on lines +142 to +145
/**
* Concept: run of upsert events.
*
* A run is a maximal sequence of consecutive upsert events (in sorted order by sequencing)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up; I explain a bunch of concepts in this scaladoc so readers have context on the startAt, endAt, and recordStartAt columns I introduce below, but none of these concepts are actually actively used in this PR.

Copy link
Copy Markdown
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it's a bit hard to understand the concrete abstraction that this PR implements: it does suchandsuch list of transformations, and does it correctly as far as I can tell, but how do we know it's the right list? Since this is all inside a new component, I'm OK with proceeding as is (after handling the duplicates question), but the structure may mean we have to go back and revisit parts of this in future PRs.

*
* Step ordering is load-bearing: the row-extension steps reference user data columns that
* target-column selection is allowed to drop, so selection runs last. Unlike SCD1, no per-key
* deduplication step is needed - SCD2 preserves every event as part of the row's history.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also preserve full-event duplicates (which would eventually map to a START == END row)?

Row(1, null, 20L, 20L, Row(20L))
)
)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the SCD1 equivalent preprocessing drops duplicates, I think we should have a case explicitly confirming that duplicates are not dropped here. (Or perhaps confirming that full row duplicates are dropped, if that's the intended behavior.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants