As far as I know Flink is actually the only engine we have at the moment that can produce Equally deletes and only Equality deletes have this specific problem. Since an equality delete can be written without actually knowing whether rows are being updated or not, it is always ambiguous as to whether a new row is an updated row, a newly added row, or a row which was deleted but then a newly added row was also appended.
I think in this case we need to ignore row_versioning and just give every new row a brand new identifier. For a reader this means all updates look like a "delete" and "add" and no "updates". For other processes (COW and Position Deletes) we only mark records as being deleted or updated after finding them first, this makes it easy to take the lineage identifier from the source record and change it. For Spark, we just kept working on engine improvements (like SPJ, Dynamic partition pushdown) to try to make that scan and join faster but we probably still require a bit slower latency. I think we could theoretically resolve equality deletes into updates at compaction time again but only if the user first defines accurate "row identity" columns because otherwise we have no way of determining whether rows were updated or not. This is basically the issue we have now in the CDC procedures. Ideally, I think we need to find a way to have flink locate updated rows at runtime using some better indexing structure or something like that as you suggested. On Sat, Aug 17, 2024 at 1:07 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Russell, > > As discussed offline, this would be very hard to implement with the > current Flink CDC write strategies. I think this is true for every > streaming writers. > > For tracking the previous version of the row, the streaming writer would > need to scan the table. It needs to be done for every record to find the > previous version. This could be possible if the data would be stored in a > way which supports fast queries on the primary key, like LSM Tree (see: > Paimon [1]), otherwise it would be prohibitively costly, and unfeasible for > higher loads. So adding a new storage strategy could be one solution. > > Alternatively we might find a way for the compaction to update the lineage > fields. We could provide a way to link the equality deletes to the new rows > which updated them during write, then on compaction we could update the > lineage fields based on this info. > > Is there any better ideas with Spark streaming which we can adopt? > > Thanks, > Peter > > [1] - https://paimon.apache.org/docs/0.8/ > > On Sat, Aug 17, 2024, 01:06 Russell Spitzer <russell.spit...@gmail.com> > wrote: > >> Hi Y'all, >> >> We've been working on a new proposal to add Row Lineage to Iceberg in the >> V3 Spec. The general idea is to give every row a unique identifier as well >> as a marker of what version of the row it is. This should let us build a >> variety of features related to CDC, Incremental Processing and Audit >> Logging. If you are interested please check out the linked proposal below. >> This will require compliance from all engines to be really useful so It's >> important we come to consensus on whether or not this is possible. >> >> >> https://docs.google.com/document/d/146YuAnU17prnIhyuvbCtCtVSavyd5N7hKryyVRaFDTE/edit?usp=sharing >> >> >> Thank you for your consideration, >> Russ >> >