As far as I know Flink is actually the only engine we have at the moment
that can produce Equally deletes and only Equality deletes have this
specific problem. Since an equality delete can be written without actually
knowing whether rows are being updated or not, it is always ambiguous as to
whether a new row is an updated row, a newly added row, or a row which was
deleted but then a newly added row was also appended.

I think in this case we need to ignore row_versioning and just give every
new row a brand new identifier. For a reader this means all updates look
like a "delete" and "add" and no "updates". For other processes (COW and
Position Deletes) we only mark records as being deleted or updated after
finding them first, this makes it easy to take the lineage identifier from
the source record and change it. For Spark, we just kept working on engine
improvements (like SPJ, Dynamic partition pushdown) to try to make that
scan and join faster but we probably still require a bit slower latency.

I think we could theoretically resolve equality deletes into updates at
compaction time again but only if the user first defines accurate "row
identity" columns because otherwise we have no way of determining whether
rows were updated or not. This is basically the issue we have now in the
CDC procedures. Ideally, I think we need to find a way to have flink locate
updated rows at runtime using some better indexing structure or something
like that as you suggested.

On Sat, Aug 17, 2024 at 1:07 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Russell,
>
> As discussed offline, this would be very hard to implement with the
> current Flink CDC write strategies. I think this is true for every
> streaming writers.
>
> For tracking the previous version of the row, the streaming writer would
> need to scan the table. It needs to be done for every record to find the
> previous version. This could be possible if the data would be stored in a
> way which supports fast queries on the primary key, like LSM Tree (see:
> Paimon [1]), otherwise it would be prohibitively costly, and unfeasible for
> higher loads. So adding a new storage strategy could be one solution.
>
> Alternatively we might find a way for the compaction to update the lineage
> fields. We could provide a way to link the equality deletes to the new rows
> which updated them during write, then on compaction we could update the
> lineage fields based on this info.
>
> Is there any better ideas with Spark streaming which we can adopt?
>
> Thanks,
> Peter
>
> [1] - https://paimon.apache.org/docs/0.8/
>
> On Sat, Aug 17, 2024, 01:06 Russell Spitzer <russell.spit...@gmail.com>
> wrote:
>
>> Hi Y'all,
>>
>> We've been working on a new proposal to add Row Lineage to Iceberg in the
>> V3 Spec. The general idea is to give every row a unique identifier as well
>> as a marker of what version of the row it is. This should let us build a
>> variety of features related to CDC, Incremental Processing and Audit
>> Logging. If you are interested please check out the linked proposal below.
>> This will require compliance from all engines to be really useful so It's
>> important we come to consensus on whether or not this is possible.
>>
>>
>> https://docs.google.com/document/d/146YuAnU17prnIhyuvbCtCtVSavyd5N7hKryyVRaFDTE/edit?usp=sharing
>>
>>
>> Thank you for your consideration,
>> Russ
>>
>

Reply via email to