The performance degradation you see is because Spark cannot push down subquery 
expressions. That’s why Iceberg ends up scanning the complete table. You will 
still rewrite only files that have matches but I assume joining the complete 
table will be expensive.

In order to benefit from partition and min/max filtering, you have to ensure 
your DELETE statement has predicates that can be pushed down to Iceberg. One 
way to achieve that is by creating a single AND/OR expression if you have a 
reasonable number of ids to remove. Another way is to add a predicate on 
partition columns manually if you know you are only interested in particular 
partitions. For example, you could cache the temp view, compute all unique 
partitions and add an IN expression to your DELETE statement for that. You 
could also consider adding extra predicates that would benefit from min/max 
filtering if your data is properly clustered.

- Anton 

> On 25 May 2021, at 09:52, Huadong Liu <huadong...@gmail.com> wrote:
> 
> Hi iceberg-dev,
> 
> I have a table that is partitioned by id (custom partitioning at the moment, 
> not iceberg hidden partitioning) and event time. Individual DELETE finishes 
> reasonably fast, for example:
> 
> sql("DELETE FROM table where id_shard=111 and id=111456")
> sql("DELETE FROM table where id_shard=222 and id=222456")
> 
> I want to reduce table commits by creating a temp view and running a single 
> DELETE for both. It becomes extremely slow:
> 
> df = spark.createDataFrame([(111, 111456), (222, 222456)], ["id_shard_t", 
> "id_t"])
> df.createOrReplaceTempView("rows")
> sql(DELETE FROM table WHERE (id_shard, id) IN (SELECT id_shard_t, id_t FROM 
> rows))
> 
> Looks like I lose the benefit of partitioning with multi column join. Any 
> hint on reducing table commits while maintaining query efficiency? Thank you!
> 
> --
> Huadong
> 
> 

Reply via email to