Thanks Ryan for responding.
I think iceberg streaming writes documentation recommends to use fanout
writers for streaming:
https://iceberg.apache.org/docs/latest/spark-structured-streaming/#writing-against-partitioned-table
Regarding, too-many files, that can happen regardless of fanout or
presorting dataframe, right? e.g. I am ingesting event data and writing it
to date(ts) partition table. so for each day there is only one partition
but I'm ingesting and writing event data every 5 minutes so end of the day
I will have more than 200 files per each partition. I will have to run
compaction regardless it seems.

Best
Nirav





On Thu, Aug 31, 2023 at 8:59 AM Ryan Blue <b...@tabular.io> wrote:

> We generally don't recommend fanout writers because they create lots of
> small data files. It also isn't clear why the table's partitioning isn't
> causing Spark to distribute the data properly -- maybe you're using an old
> Spark version?
>
> In any case, you can distribute the data yourself to align with the
> table's partitioning. You can either use `.repartition` with the dataframes
> API, or you can sort. I recommend sorting by adding an `ORDER BY` statement
> to your SQL.
>
> Ryan
>
> On Wed, Aug 30, 2023 at 10:36 AM Nirav Patel <nira...@gmail.com> wrote:
>
>> Should I try "fanout-enabled" option within foreachBatch method where I
>> do dataframe.write ?
>>
>> On Wed, Aug 30, 2023 at 10:29 AM Nirav Patel <nira...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using spark structured streaming and using foreachBatch sink to
>>> append to iceberg dual hidden partitioned table.
>>> I got this infamous error about input dataframe or partition needing to
>>> be clustered:
>>>
>>> *Incoming records violate the writer assumption that records are
>>> clustered by spec and by partition within each spec. Either cluster the
>>> incoming records or switch to fanout writers.*
>>>
>>> I tried setting "fanout-enabled" to "true" before calling foreachBatch
>>> but it didnt work at all. Got same error.
>>>
>>> I tried partitionedBy(days("date"), col("customerid")) and that didn't
>>> work either.
>>>
>>> Then I used spark sql approach:
>>> INSERT INTO {dest_schema_fqn}
>>>                 SELECT * from {success_agg_tbl} order by date(date),
>>> tenant
>>>
>>> and that worked.
>>>
>>> I know of following table level config:
>>> write.spark.fanout.enabled - False
>>> write.distribution-mode - None
>>> but I have left it to defaults as I assume writer will override those
>>> settings.
>>>
>>> so do "fanout-enabled" option have effect when using with foreachBatch?
>>> (I'm new to spark streaming as well)
>>>
>>> thanks
>>>
>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to