Hi Aditya, See my answers below:
Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: 2024. ápr. 20., Szo, 11:05): > Hi Péter, Gabor, > > Thanks a lot for clarifying and providing additional information, I had > few followup queries- > 1. We want to ingest an CDC stream using Flink to Iceberg sink, if we > have a RETRACT like CDC stream that uniquely distinguishes between INSERT / > UPDATE (for eg contains events as defined in > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/), > is there any mode in flink-iceberg ingestion today that does not create > delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ? > You can use `upsert(false)`, or just leave it out, as the default is false. This will create an equality delete for every `-D`, `-U` events, and positional delete if there is an insert for the same key in the same checkpoint. > 2. I was browsing through iceberg compaction and found that there is a > interface defined - ConvertEqualityDeleteFiles.java > <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> > which might make reads more efficient and may be more cost effective than > a full compaction, but it looks like the interface > ConvertEqualityDeleteFiles.java > <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> > is > not implemented neither in spark or flink, do you know if there are plans > to implement this interface in spark or flink? > I remember the discussion around it, and this conversion is questionable. For converting the equality delete to a positional delete, we need to scan the whole table. This is half of the IO, and maybe rewriting the whole table is better in this case. Especially, that after the conversion we still have to apply the positional deletes during the read. > Regards, > Aditya > > > On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Aditya, >> >> The definition of UPSERT is that we have 2 types of messages: >> - DELETE - we need to remove the old record with the given id. >> - UPSERT - we need to remove the old version of the record based on the >> id, and should add a new version >> >> See: >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion >> >> This type of stream uses fewer events when there are many updates, than >> the RETRACT stream. In a RETRACT stream we have 2 types of records: >> - DELETE - we need to remove the old record with the given id. >> - INSERT - insert a new record. >> >> So when we have an UPSERT stream, we can't be sure that there is no >> previous record for the given id, we either have to scan the whole table to >> find the previous version, or always create the equality delete to remove >> it. >> >> When the Flink sink was implemented, the decision was to avoid scanning, >> and write out the equality record every time. As Gabor already mentioned, >> it is recommended to compact the table to remove the equality deletes, to >> enhance the read performance. Currently we have Spark compaction ready, and >> we are working on the Flink compaction too. See: >> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >> >> I hope this helps, >> Peter >> >> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. 18., >> Cs, 9:40): >> >>> Hey, >>> I had the chance to explore this area of eq-deletes recently myself too. >>> Apparently, this behavior is by design in Flink. The reason why it >>> unconditionally writes an eq-delete too for each insert (only in >>> upsert-mode, though) is to guarantee the uniqueness of the primary key. So >>> it drops the previous row with the same PK with a new eq-delete and then >>> adds the new row in a new data file. This happens unconditionally >>> unfortunately, so no reading is happening and even if there was no row >>> previously with the given PK, Flink will write an eq-delete for it anyway. >>> Yes, this can hurt read performance, so I guess the advised best >>> practice is to compact your table frequently. >>> >>> Gabor >>> >>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta >>> <adigu...@linkedin.com.invalid> wrote: >>> >>>> Hi all, >>>> >>>> >>>> >>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >>>> record with a new equality field Id, then a equality delete file is also >>>> created with the corresponding entry, for example I executed following >>>> commands in Flink SQL with Apache Iceberg- >>>> >>>> >>>> >>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >>>> >>>> `id` INT UNIQUE COMMENT 'unique id', >>>> >>>> `data` STRING NOT NULL, >>>> >>>> PRIMARY KEY(`id`) NOT ENFORCED >>>> >>>> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >>>> >>>> >>>> >>>> now I inserted a record- >>>> >>>> >>>> >>>> INSERT INTO upsert_test1 VALUES (7, 'new value'); >>>> >>>> >>>> >>>> It resulted in 2 files - >>>> >>>> data file content- >>>> >>>> >>>> >>>> {"id":7,"data":"new value"} >>>> >>>> >>>> >>>> But it also created an equality delete file - >>>> >>>> >>>> >>>> {"id":7} >>>> >>>> >>>> >>>> I expect that it will create a delete file entry for UPDATE / DELETE >>>> but not for INSERT as it might lead to performance degradation for reads >>>> for CDC tables, right? >>>> >>>> is it expected that fresh INSERTS will also have equality delete >>>> entries ? If yes, what is the benefit of having equality delete entry for >>>> INSERTS ? >>>> >>>> >>>> >>>> >>>> >>>> Regards, >>>> >>>> Aditya >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>