Thank you Anton! Follow up questions inline. On Tue, May 25, 2021 at 10:31 AM Anton Okolnychyi <aokolnyc...@apple.com.invalid> wrote:
> The performance degradation you see is because Spark cannot push down > subquery expressions. That’s why Iceberg ends up scanning the complete > table. You will still rewrite only files that have matches but I assume > joining the complete table will be expensive. > > In order to benefit from partition and min/max filtering, you have to > ensure your DELETE statement has predicates that can be pushed down to > Iceberg. One way to achieve that is by creating a single AND/OR expression > if you have a reasonable number of ids to remove. > Is there a documented limit on the number of OR expressions to chain? > Another way is to add a predicate on partition columns manually if you > know you are only interested in particular partitions. For example, you > could cache the temp view, compute all unique partitions and add an IN > expression to your DELETE statement for that. > Are you saying to chain the partitions, something like *sql(DELETE FROM table WHERE (id_shard=111 and id IN (SELECT id_t FROM rows)) OR (id_shard=222 and id IN (SELECT id_t FROM rows)))?* This is very similar to the option above. Ideally I want to benefit from Spark parallelism without having to manage a large number of DELETE statements explicitly. > You could also consider adding extra predicates that would benefit from > min/max filtering if your data is properly clustered. > > - Anton > > On 25 May 2021, at 09:52, Huadong Liu <huadong...@gmail.com> wrote: > > Hi iceberg-dev, > > I have a table that is partitioned by id (custom partitioning at the > moment, not iceberg hidden partitioning) and event time. Individual DELETE > finishes reasonably fast, for example: > > *sql("DELETE FROM table where id_shard=111 and id=111456")* > *sql("DELETE FROM table where id_shard=222 and id=222456")* > > I want to reduce table commits by creating a temp view and running a > single DELETE for both. It becomes extremely slow: > > > *df = spark.createDataFrame([(111, 111456), (222, 222456)], ["id_shard_t", > "id_t"])* > > *df.createOrReplaceTempView("rows")* > *sql(DELETE FROM table WHERE (id_shard, id) IN (SELECT id_shard_t, id_t > FROM rows))* > > Looks like I lose the benefit of partitioning with multi column join. Any > hint on reducing table commits while maintaining query efficiency? Thank > you! > > -- > Huadong > > > >