Henryk Česnolovič <henryk.cesnolo...@gmail.com>
08:30 (5 hours ago)
to Soumasish
Ok nvm. Seems we don't need to do repartition, as spark handles itself.
df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
col("y"))).using("iceberg").createOrReplace()
or later
df.writeTo("some_table").append()

spark understands that partitioning is on the table and does repartition
automatically it seems.

Best regards,
Henryk


On Sun, 1 Dec 2024 at 00:57, Soumasish <soumas...@gmail.com> wrote:

> Henryk,
>
> I could reproduce your issue and achieve the desired result using SQL DDL.
> Here's the workaround.
>
>
> package replicator
>
> import org.apache.spark.sql.SparkSession
>
> object Bucketing extends App {
>
>   val spark = SparkSession.builder()
>     .appName("ReproduceError")
>     .master("local[*]")
>     // Configure the Iceberg catalog
>     .config("spark.sql.catalog.my_catalog", 
> "org.apache.iceberg.spark.SparkCatalog")
>     .config("spark.sql.catalog.my_catalog.type", "hadoop")
>     .config("spark.sql.catalog.my_catalog.warehouse", 
> "file:///tmp/iceberg_warehouse")
>     .getOrCreate()
>
>   import spark.implicits._
>
>   val df = Seq(
>     ("2023-10-01", 1, 10),
>     ("2023-10-02", 2, 20),
>     ("2023-10-03", 3, 30)
>   ).toDF("date", "x", "y")
>
>
>   spark.sql("""
>     CREATE TABLE IF NOT EXISTS my_catalog.default.some_table (
>       date STRING,
>       x INT,
>       y INT
>     )
>     USING iceberg
>     PARTITIONED BY (date, x, bucket(10, y))
>   """)
>
>   // Step 2: Write data to the Iceberg table
>   df.writeTo("my_catalog.default.some_table")
>     .append()
> }
>
> Seems like the V2 writer doesn't support a  transform function like bucket
> inside partitionedBy
>
>
> Best Regards
> Soumasish Goswami
> in: www.linkedin.com/in/soumasish
> # (415) 530-0405
>
>    -
>
>
>
> On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič <
> henryk.cesnolo...@gmail.com> wrote:
>
>> Hello.
>>
>> Maybe somebody has faced the same issue. Trying to write data to the
>> table while using DataFrame API v2. Table is partitioned by buckets using
>> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
>> col("y"))).using("iceberg").createOrReplace()
>>  Can I somehow prepare df in terms of partitions before writing to
>> destination to not to write too many files? Raw data is not grouped by
>> keys. Expectations are like
>> df.repartition(col("x"), bucket(10,
>> col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"),
>> bucket(10, col("y"))).using("iceberg").createOrReplace() .
>> bucket function can't be used in that way, because getting [INTERNAL_ERROR]
>> Cannot generate code for expression: bucket(10, input[0, bigint, true])
>>
>> Thanks
>>
>

Reply via email to