Henryk, I could reproduce your issue and achieve the desired result using SQL DDL. Here's the workaround.
package replicator import org.apache.spark.sql.SparkSession object Bucketing extends App { val spark = SparkSession.builder() .appName("ReproduceError") .master("local[*]") // Configure the Iceberg catalog .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.my_catalog.type", "hadoop") .config("spark.sql.catalog.my_catalog.warehouse", "file:///tmp/iceberg_warehouse") .getOrCreate() import spark.implicits._ val df = Seq( ("2023-10-01", 1, 10), ("2023-10-02", 2, 20), ("2023-10-03", 3, 30) ).toDF("date", "x", "y") spark.sql(""" CREATE TABLE IF NOT EXISTS my_catalog.default.some_table ( date STRING, x INT, y INT ) USING iceberg PARTITIONED BY (date, x, bucket(10, y)) """) // Step 2: Write data to the Iceberg table df.writeTo("my_catalog.default.some_table") .append() } Seems like the V2 writer doesn't support a transform function like bucket inside partitionedBy Best Regards Soumasish Goswami in: www.linkedin.com/in/soumasish # (415) 530-0405 - On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič < henryk.cesnolo...@gmail.com> wrote: > Hello. > > Maybe somebody has faced the same issue. Trying to write data to the table > while using DataFrame API v2. Table is partitioned by buckets using > df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10, > col("y"))).using("iceberg").createOrReplace() > Can I somehow prepare df in terms of partitions before writing to > destination to not to write too many files? Raw data is not grouped by > keys. Expectations are like > df.repartition(col("x"), bucket(10, > col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"), > bucket(10, col("y"))).using("iceberg").createOrReplace() . > bucket function can't be used in that way, because getting [INTERNAL_ERROR] > Cannot generate code for expression: bucket(10, input[0, bigint, true]) > > Thanks >