Henryk,

I could reproduce your issue and achieve the desired result using SQL DDL.
Here's the workaround.


package replicator

import org.apache.spark.sql.SparkSession

object Bucketing extends App {

  val spark = SparkSession.builder()
    .appName("ReproduceError")
    .master("local[*]")
    // Configure the Iceberg catalog
    .config("spark.sql.catalog.my_catalog",
"org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.my_catalog.type", "hadoop")
    .config("spark.sql.catalog.my_catalog.warehouse",
"file:///tmp/iceberg_warehouse")
    .getOrCreate()

  import spark.implicits._

  val df = Seq(
    ("2023-10-01", 1, 10),
    ("2023-10-02", 2, 20),
    ("2023-10-03", 3, 30)
  ).toDF("date", "x", "y")


  spark.sql("""
    CREATE TABLE IF NOT EXISTS my_catalog.default.some_table (
      date STRING,
      x INT,
      y INT
    )
    USING iceberg
    PARTITIONED BY (date, x, bucket(10, y))
  """)

  // Step 2: Write data to the Iceberg table
  df.writeTo("my_catalog.default.some_table")
    .append()
}

Seems like the V2 writer doesn't support a  transform function like bucket
inside partitionedBy


Best Regards
Soumasish Goswami
in: www.linkedin.com/in/soumasish
# (415) 530-0405

   -



On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič <
henryk.cesnolo...@gmail.com> wrote:

> Hello.
>
> Maybe somebody has faced the same issue. Trying to write data to the table
> while using DataFrame API v2. Table is partitioned by buckets using
> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
> col("y"))).using("iceberg").createOrReplace()
>  Can I somehow prepare df in terms of partitions before writing to
> destination to not to write too many files? Raw data is not grouped by
> keys. Expectations are like
> df.repartition(col("x"), bucket(10,
> col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"),
> bucket(10, col("y"))).using("iceberg").createOrReplace() .
> bucket function can't be used in that way, because getting [INTERNAL_ERROR]
> Cannot generate code for expression: bucket(10, input[0, bigint, true])
>
> Thanks
>

Reply via email to