[
https://issues.apache.org/jira/browse/SPARK-56683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18086924#comment-18086924
]
Juliusz Sompolski commented on SPARK-56683:
-------------------------------------------
Hi [~yadavay]
I am working on the guaranteed CTE reuse together with a couple of colleagues
and should have these changes soon, to build DSv2 source materialization on top
of it.
I originally worked on MergeIntoMaterializeSource a couple years back and the
localCheckpoint that it uses is a pain in a lot of ways - not performant
serialization/deserialization, drops out to RDD execution that then drops a lot
of SQL / AQE logic, needs a very crude handling of retries if we lose some RDD
blocks. I would like to do it the proper way in DSv2 by using a guaranteed
reused shuffle, which would automatically pick up all the nice feature that
shuffles have - more performant format, automatic logic to know what to
recompute after failures based on checksums etc.
I hoped to have it done for Spark 4.2 but didn't make it in time, but it should
be ready and up in a PR soonish.
> MERGE INTO TABLE reads the source twice and the two reads can disagree
> leading to data inconsistency
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-56683
> URL: https://issues.apache.org/jira/browse/SPARK-56683
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Juliusz Sompolski
> Priority: Major
>
> RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that
> references the source query in two positions: once as the streamed input to
> the join that pairs source rows with target rows, and once inside a subquery
> that the rewrite uses to identify which rows or groups have matching source
> rows.
> The two positions are independent reads of the same source. When the source
> is non-deterministic — for example, a table with concurrent writers, a
> streaming source, or a query containing expressions like rand() — the two
> reads can observe different sets of rows. The MERGE result is then computed
> against an inconsistent picture of the source: rows can be filtered in or out
> by the subquery while the join sees a different set of rows, producing
> dropped, duplicated, or wrongly-matched rows.
> The two reads of the source need to be made consistent so that both positions
> in the rewritten plan see the same source data, regardless of source
> determinism.
> Delta Lake resolved this with
> [https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
> in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation
> datasources suffer a possible data inconsistency issue because of it.
> This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]