Henryk Česnolovič <henryk.cesnolo...@gmail.com> 08:30 (5 hours ago) to Soumasish Ok nvm. Seems we don't need to do repartition, as spark handles itself. df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10, col("y"))).using("iceberg").createOrReplace() or later df.writeTo("some_table").append()
spark understands that partitioning is on the table and does repartition automatically it seems. Best regards, Henryk On Sun, 1 Dec 2024 at 00:57, Soumasish <soumas...@gmail.com> wrote: > Henryk, > > I could reproduce your issue and achieve the desired result using SQL DDL. > Here's the workaround. > > > package replicator > > import org.apache.spark.sql.SparkSession > > object Bucketing extends App { > > val spark = SparkSession.builder() > .appName("ReproduceError") > .master("local[*]") > // Configure the Iceberg catalog > .config("spark.sql.catalog.my_catalog", > "org.apache.iceberg.spark.SparkCatalog") > .config("spark.sql.catalog.my_catalog.type", "hadoop") > .config("spark.sql.catalog.my_catalog.warehouse", > "file:///tmp/iceberg_warehouse") > .getOrCreate() > > import spark.implicits._ > > val df = Seq( > ("2023-10-01", 1, 10), > ("2023-10-02", 2, 20), > ("2023-10-03", 3, 30) > ).toDF("date", "x", "y") > > > spark.sql(""" > CREATE TABLE IF NOT EXISTS my_catalog.default.some_table ( > date STRING, > x INT, > y INT > ) > USING iceberg > PARTITIONED BY (date, x, bucket(10, y)) > """) > > // Step 2: Write data to the Iceberg table > df.writeTo("my_catalog.default.some_table") > .append() > } > > Seems like the V2 writer doesn't support a transform function like bucket > inside partitionedBy > > > Best Regards > Soumasish Goswami > in: www.linkedin.com/in/soumasish > # (415) 530-0405 > > - > > > > On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič < > henryk.cesnolo...@gmail.com> wrote: > >> Hello. >> >> Maybe somebody has faced the same issue. Trying to write data to the >> table while using DataFrame API v2. Table is partitioned by buckets using >> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10, >> col("y"))).using("iceberg").createOrReplace() >> Can I somehow prepare df in terms of partitions before writing to >> destination to not to write too many files? Raw data is not grouped by >> keys. Expectations are like >> df.repartition(col("x"), bucket(10, >> col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"), >> bucket(10, col("y"))).using("iceberg").createOrReplace() . >> bucket function can't be used in that way, because getting [INTERNAL_ERROR] >> Cannot generate code for expression: bucket(10, input[0, bigint, true]) >> >> Thanks >> >