Hi Aditya, The definition of UPSERT is that we have 2 types of messages: - DELETE - we need to remove the old record with the given id. - UPSERT - we need to remove the old version of the record based on the id, and should add a new version
See: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion This type of stream uses fewer events when there are many updates, than the RETRACT stream. In a RETRACT stream we have 2 types of records: - DELETE - we need to remove the old record with the given id. - INSERT - insert a new record. So when we have an UPSERT stream, we can't be sure that there is no previous record for the given id, we either have to scan the whole table to find the previous version, or always create the equality delete to remove it. When the Flink sink was implemented, the decision was to avoid scanning, and write out the equality record every time. As Gabor already mentioned, it is recommended to compact the table to remove the equality deletes, to enhance the read performance. Currently we have Spark compaction ready, and we are working on the Flink compaction too. See: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl I hope this helps, Peter Gabor Kaszab <gaborkas...@apache.org> ezt írta (időpont: 2024. ápr. 18., Cs, 9:40): > Hey, > I had the chance to explore this area of eq-deletes recently myself too. > Apparently, this behavior is by design in Flink. The reason why it > unconditionally writes an eq-delete too for each insert (only in > upsert-mode, though) is to guarantee the uniqueness of the primary key. So > it drops the previous row with the same PK with a new eq-delete and then > adds the new row in a new data file. This happens unconditionally > unfortunately, so no reading is happening and even if there was no row > previously with the given PK, Flink will write an eq-delete for it anyway. > Yes, this can hurt read performance, so I guess the advised best practice > is to compact your table frequently. > > Gabor > > On Wed, Apr 17, 2024 at 6:45 PM Aditya Gupta <adigu...@linkedin.com.invalid> > wrote: > >> Hi all, >> >> >> >> In Flink SQL, in UPSERT mode, I have observed that if I INSERT a new >> record with a new equality field Id, then a equality delete file is also >> created with the corresponding entry, for example I executed following >> commands in Flink SQL with Apache Iceberg- >> >> >> >> CREATE TABLE `hadoop_catalog`.`testdb`.`upsert_test1` ( >> >> `id` INT UNIQUE COMMENT 'unique id', >> >> `data` STRING NOT NULL, >> >> PRIMARY KEY(`id`) NOT ENFORCED >> >> ) with ('format-version'='2', 'write.upsert.enabled'='true'); >> >> >> >> now I inserted a record- >> >> >> >> INSERT INTO upsert_test1 VALUES (7, 'new value'); >> >> >> >> It resulted in 2 files - >> >> data file content- >> >> >> >> {"id":7,"data":"new value"} >> >> >> >> But it also created an equality delete file - >> >> >> >> {"id":7} >> >> >> >> I expect that it will create a delete file entry for UPDATE / DELETE but >> not for INSERT as it might lead to performance degradation for reads for >> CDC tables, right? >> >> is it expected that fresh INSERTS will also have equality delete entries >> ? If yes, what is the benefit of having equality delete entry for INSERTS ? >> >> >> >> >> >> Regards, >> >> Aditya >> >> >> >> >> >> >> >