Thanks Ryan for responding. I think iceberg streaming writes documentation recommends to use fanout writers for streaming: https://iceberg.apache.org/docs/latest/spark-structured-streaming/#writing-against-partitioned-table Regarding, too-many files, that can happen regardless of fanout or presorting dataframe, right? e.g. I am ingesting event data and writing it to date(ts) partition table. so for each day there is only one partition but I'm ingesting and writing event data every 5 minutes so end of the day I will have more than 200 files per each partition. I will have to run compaction regardless it seems.
Best Nirav On Thu, Aug 31, 2023 at 8:59 AM Ryan Blue <b...@tabular.io> wrote: > We generally don't recommend fanout writers because they create lots of > small data files. It also isn't clear why the table's partitioning isn't > causing Spark to distribute the data properly -- maybe you're using an old > Spark version? > > In any case, you can distribute the data yourself to align with the > table's partitioning. You can either use `.repartition` with the dataframes > API, or you can sort. I recommend sorting by adding an `ORDER BY` statement > to your SQL. > > Ryan > > On Wed, Aug 30, 2023 at 10:36 AM Nirav Patel <nira...@gmail.com> wrote: > >> Should I try "fanout-enabled" option within foreachBatch method where I >> do dataframe.write ? >> >> On Wed, Aug 30, 2023 at 10:29 AM Nirav Patel <nira...@gmail.com> wrote: >> >>> Hi, >>> >>> I am using spark structured streaming and using foreachBatch sink to >>> append to iceberg dual hidden partitioned table. >>> I got this infamous error about input dataframe or partition needing to >>> be clustered: >>> >>> *Incoming records violate the writer assumption that records are >>> clustered by spec and by partition within each spec. Either cluster the >>> incoming records or switch to fanout writers.* >>> >>> I tried setting "fanout-enabled" to "true" before calling foreachBatch >>> but it didnt work at all. Got same error. >>> >>> I tried partitionedBy(days("date"), col("customerid")) and that didn't >>> work either. >>> >>> Then I used spark sql approach: >>> INSERT INTO {dest_schema_fqn} >>> SELECT * from {success_agg_tbl} order by date(date), >>> tenant >>> >>> and that worked. >>> >>> I know of following table level config: >>> write.spark.fanout.enabled - False >>> write.distribution-mode - None >>> but I have left it to defaults as I assume writer will override those >>> settings. >>> >>> so do "fanout-enabled" option have effect when using with foreachBatch? >>> (I'm new to spark streaming as well) >>> >>> thanks >>> >> > > -- > Ryan Blue > Tabular >