Very high change rate means that it is most probably worth it to rewrite the data files from time to time. The high change rate itself causes write amplification, as every new version of the record is essentially a new row to the table (and a tombstone for the old data). The ConvertEqualityDeleteFiles will not change this. It will change only that instead of storing a tombstone of "delete every row where the id = 10" , we store a tombstone of "delete the n-th row from the x file". Reading the positional delete files are a bit more effective, but if we rewrite the table data then we get rid of the old data, and the tombstone. Converting the equality deletes just come up in the discussion about the table maintenance thread: https://lists.apache.org/thread/37k03vfmrr81o10d9zgdhpyc28mrrp9z
Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: 2024. ápr. 22., H, 15:28): > Hi Péter, > > Thanks for the detailed answers, we have some CDC streams that have very > high change rate, for such tables we were thinking to leverage > ConvertEqualityDeleteFiles > <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> > to convert eq deletes to pos deletes to improve the MoR performance and > run it in at a high frequency (like let's say every hour). > Is the suggestion to instead run full compaction at higher frequency to > improve read performance? > We think Full Compaction / rewriting may cause a lot of write > amplification if run at higher frequency for tables having high change, we > wanted to check if any analysis performed on tradeoffs for having > ConvertEqualityDeleteFiles > <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> > procedure > run at higher frequency vs having run compaction? or are there any other > suggestions? > > Regards, > Aditya > > > On Mon, Apr 22, 2024 at 2:35 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Aditya, >> >> See my answers below: >> >> Aditya Narayan Gupta <gupta.adityanara...@gmail.com> ezt írta (időpont: >> 2024. ápr. 20., Szo, 11:05): >> >>> Hi Péter, Gabor, >>> >>> Thanks a lot for clarifying and providing additional information, I had >>> few followup queries- >>> 1. We want to ingest an CDC stream using Flink to Iceberg sink, if we >>> have a RETRACT like CDC stream that uniquely distinguishes between INSERT / >>> UPDATE (for eg contains events as defined in >>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/developer-guide/understand-flink-cdc-api/), >>> is there any mode in flink-iceberg ingestion today that does not create >>> delete for INSERT events ? (and only creates deletes for UPDATE / DELETE) ? >>> >> >> You can use `upsert(false)`, or just leave it out, as the default is >> false. >> This will create an equality delete for every `-D`, `-U` events, and >> positional delete if there is an insert for the same key in the same >> checkpoint. >> >> >>> 2. I was browsing through iceberg compaction and found that there is a >>> interface defined - ConvertEqualityDeleteFiles.java >>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >>> which might make reads more efficient and may be more cost effective than >>> a full compaction, but it looks like the interface >>> ConvertEqualityDeleteFiles.java >>> <https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/actions/ConvertEqualityDeleteFiles.java> >>> is >>> not implemented neither in spark or flink, do you know if there are plans >>> to implement this interface in spark or flink? >>> >> >> I remember the discussion around it, and this conversion is questionable. >> For converting the equality delete to a positional delete, we need to scan >> the whole table. This is half of the IO, and maybe rewriting the whole >> table is better in this case. Especially, that after the conversion we >> still have to apply the positional deletes during the read. >> >> >>> Regards, >>> Aditya >>> >>> >>> On Thu, Apr 18, 2024 at 9:32 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Aditya, >>>> >>>> The definition of UPSERT is that we have 2 types of messages: >>>> - DELETE - we need to remove the old record with the given id. >>>> - UPSERT - we need to remove the old version of the record based on the >>>> id, and should add a new version >>>> >>>> See: >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion >>>> >>>> This type of stream uses fewer events when there are many updates, than >>>> the RETRACT stream. In a RETRACT stream we have 2 types of records: >>>> - DELETE - we need to remove the old record with the given id. >>>> - INSERT - insert a new record. >>>> >>>> So when we have an UPSERT stream, we can't be sure that there is no >>>> previous record for the given id, we either have to scan the whole table to >>>> find the previous version, or always create the equality delete to remove >>>> it. >>>> >>>> When the Flink sink was implemented, the decision was to avoid >>>> scanning, and write out the equality record every time. As Gabor already >>>> mentioned, it is recommended to compact the table to remove the equality >>>> deletes, to enhance the read performance. Currently we have Spark >>>> compaction ready, and we are working on the Flink compaction too. See: >>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>>> >>>> I hope this helps, >>>> Peter >>>> >>>> Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. >>>> 18., Cs, 9:40): >>>> >>>>> Hey, >>>>> I had the chance to explore this area of eq-deletes recently myself >>>>> too. Apparently, this behavior is by design in Flink. The reason why it >>>>> unconditionally writes an eq-delete too for each insert (only in >>>>> upsert-mode, though) is to guarantee the uniqueness of the primary key. So >>>>> it drops the previous row with the same PK with a new eq-delete and then >>>>> adds the new row in a new data file. This happens unconditionally >>>>> unfortunately, so no reading is happening and even if there was no row >>>>> previously with the given PK, Flink will write an eq-delete for it anyway. >>>>> Yes, this can hurt read performance, so I guess the advised best >>>>> practice is to compact your table frequently. >>>>> >>>>> Gabor >>>>> >>>>> On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta >>>>> <adigu...@linkedin.com.invalid> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> >>>>>> >>>>>> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >>>>>> record with a new equality field Id, then a equality delete file is also >>>>>> created with the corresponding entry, for example I executed following >>>>>> commands in Flink SQL with Apache Iceberg- >>>>>> >>>>>> >>>>>> >>>>>> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >>>>>> >>>>>> `id` INT UNIQUE COMMENT 'unique id', >>>>>> >>>>>> `data` STRING NOT NULL, >>>>>> >>>>>> PRIMARY KEY(`id`) NOT ENFORCED >>>>>> >>>>>> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >>>>>> >>>>>> >>>>>> >>>>>> now I inserted a record- >>>>>> >>>>>> >>>>>> >>>>>> INSERT INTO upsert_test1 VALUES (7, 'new value'); >>>>>> >>>>>> >>>>>> >>>>>> It resulted in 2 files - >>>>>> >>>>>> data file content- >>>>>> >>>>>> >>>>>> >>>>>> {"id":7,"data":"new value"} >>>>>> >>>>>> >>>>>> >>>>>> But it also created an equality delete file - >>>>>> >>>>>> >>>>>> >>>>>> {"id":7} >>>>>> >>>>>> >>>>>> >>>>>> I expect that it will create a delete file entry for UPDATE / DELETE >>>>>> but not for INSERT as it might lead to performance degradation for reads >>>>>> for CDC tables, right? >>>>>> >>>>>> is it expected that fresh INSERTS will also have equality delete >>>>>> entries ? If yes, what is the benefit of having equality delete entry for >>>>>> INSERTS ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Aditya >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>