Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Chris Coutinho
Hi Gourav, The static table is broadcasted prior to the join so the shuffle is primarily to avoid OOME during the UDF. It's not quite a Cartesian product, but yes the join results in multiple records per input record. The number of output records varies depending on the number of duplicates in th

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Gourav Sengupta
hi, Did you try to sorting while writing out the data? All of this engineering may not be required in that case. Regards, Gourav Sengupta On Sat, Feb 12, 2022 at 8:42 PM Chris Coutinho wrote: > Setting the option in the cluster configuration solved the issue, and now > we're able to specify

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Chris Coutinho
Setting the option in the cluster configuration solved the issue, and now we're able to specify the row group size based on the block size as intended. Thanks! On Fri, Feb 11, 2022 at 6:59 PM Adam Binford wrote: > Writing to Delta might not support the write.option method. We set > spark.hadoop

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Gourav Sengupta
Hi, just trying to understand the problem before solving it. 1. you mentioned "The primary key of the static table is non-unique". This appears to be a design flaw to me. 2. you once again mentioned "The Pandas UDF is then applied to the resulting stream-static join and stored in a table. To avo

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Adam Binford
Writing to Delta might not support the write.option method. We set spark.hadoop.parquet.block.size in our spark config for writing to Delta. Adam On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho wrote: > I tried re-writing the table with the updated block size but it doesn't > appear to have an ef

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
I tried re-writing the table with the updated block size but it doesn't appear to have an effect on the row group size. ```pyspark df = spark.read.format("delta").load("/path/to/source1") df.write \ .format("delta") \ .mode("overwrite") \ .options(**{ "parquet.block.size": "1m", }) \ .partition

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Sean Owen
It should just be parquet.block.size indeed. spark.write.option("parquet.block.size", "16m").parquet(...) This is an issue in how you write, not read, the parquet. On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho wrote: > Hi Adam, > > Thanks for the explanation on the empty partitions. > > We have

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
Hi Adam, Thanks for the explanation on the empty partitions. We have the freedom to adjust how the source table is written, so if there are any improvements we can implement on the source side we'd be happy to look into that. It's not yet clear to me how you can reduce the row group size of the

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Adam Binford
The smallest unit of work you can do on a parquet file (under the delta hood) is based on the parquet row group size, which by default is 128mb. If you specify maxPartitionBytes of 10mb, what that will basically do is create a partition for each 10mb of a file, but whatever partition covers the par

Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
Hello, We have a spark structured streaming job that includes a stream-static join and a Pandas UDF, streaming to/from delta tables. The primary key of the static table is non-unique, meaning that the streaming join results in multiple records per input record - in our case 100x increase. The Pand