I might have misunderstood the issue.
Spark indeed will repartition the data while writing, what it won't do is
write precisely 10 files inside each date partition folder sorted by col x.
Typically this kind of fine grained write config is useful if there's a
downstream consumer that will use the output as an input.
Anyway glad you've a solution.

Best Regards
Soumasish Goswami
in: www.linkedin.com/in/soumasish
# (415) 530-0405

   -



On Sun, Dec 1, 2024 at 4:29 AM Henryk Česnolovič <
henryk.cesnolo...@gmail.com> wrote:

> Henryk Česnolovič <henryk.cesnolo...@gmail.com>
> 08:30 (5 hours ago)
> to Soumasish
> Ok nvm. Seems we don't need to do repartition, as spark handles itself.
> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
> col("y"))).using("iceberg").createOrReplace()
> or later
> df.writeTo("some_table").append()
>
> spark understands that partitioning is on the table and does repartition
> automatically it seems.
>
> Best regards,
> Henryk
>
>
> On Sun, 1 Dec 2024 at 00:57, Soumasish <soumas...@gmail.com> wrote:
>
>> Henryk,
>>
>> I could reproduce your issue and achieve the desired result using SQL
>> DDL. Here's the workaround.
>>
>>
>> package replicator
>>
>> import org.apache.spark.sql.SparkSession
>>
>> object Bucketing extends App {
>>
>>   val spark = SparkSession.builder()
>>     .appName("ReproduceError")
>>     .master("local[*]")
>>     // Configure the Iceberg catalog
>>     .config("spark.sql.catalog.my_catalog", 
>> "org.apache.iceberg.spark.SparkCatalog")
>>     .config("spark.sql.catalog.my_catalog.type", "hadoop")
>>     .config("spark.sql.catalog.my_catalog.warehouse", 
>> "file:///tmp/iceberg_warehouse")
>>     .getOrCreate()
>>
>>   import spark.implicits._
>>
>>   val df = Seq(
>>     ("2023-10-01", 1, 10),
>>     ("2023-10-02", 2, 20),
>>     ("2023-10-03", 3, 30)
>>   ).toDF("date", "x", "y")
>>
>>
>>   spark.sql("""
>>     CREATE TABLE IF NOT EXISTS my_catalog.default.some_table (
>>       date STRING,
>>       x INT,
>>       y INT
>>     )
>>     USING iceberg
>>     PARTITIONED BY (date, x, bucket(10, y))
>>   """)
>>
>>   // Step 2: Write data to the Iceberg table
>>   df.writeTo("my_catalog.default.some_table")
>>     .append()
>> }
>>
>> Seems like the V2 writer doesn't support a  transform function like
>> bucket inside partitionedBy
>>
>>
>> Best Regards
>> Soumasish Goswami
>> in: www.linkedin.com/in/soumasish
>> # (415) 530-0405
>>
>>    -
>>
>>
>>
>> On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič <
>> henryk.cesnolo...@gmail.com> wrote:
>>
>>> Hello.
>>>
>>> Maybe somebody has faced the same issue. Trying to write data to the
>>> table while using DataFrame API v2. Table is partitioned by buckets using
>>> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
>>> col("y"))).using("iceberg").createOrReplace()
>>>  Can I somehow prepare df in terms of partitions before writing to
>>> destination to not to write too many files? Raw data is not grouped by
>>> keys. Expectations are like
>>> df.repartition(col("x"), bucket(10,
>>> col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"),
>>> bucket(10, col("y"))).using("iceberg").createOrReplace() .
>>> bucket function can't be used in that way, because getting [INTERNAL_ERROR]
>>> Cannot generate code for expression: bucket(10, input[0, bigint, true])
>>>
>>> Thanks
>>>
>>

Reply via email to