Hi Gourav,
The static table is broadcasted prior to the join so the shuffle is primarily to
avoid OOME during the UDF.
It's not quite a Cartesian product, but yes the join results in multiple records
per input record. The number of output records varies depending on the number of
duplicates in th
hi,
Did you try to sorting while writing out the data? All of this engineering
may not be required in that case.
Regards,
Gourav Sengupta
On Sat, Feb 12, 2022 at 8:42 PM Chris Coutinho
wrote:
> Setting the option in the cluster configuration solved the issue, and now
> we're able to specify
Setting the option in the cluster configuration solved the issue, and now
we're able to specify the row group size based on the block size as
intended.
Thanks!
On Fri, Feb 11, 2022 at 6:59 PM Adam Binford wrote:
> Writing to Delta might not support the write.option method. We set
> spark.hadoop
Hi,
just trying to understand the problem before solving it.
1. you mentioned "The primary key of the static table is non-unique". This
appears to be a design flaw to me.
2. you once again mentioned "The Pandas UDF is then applied to the
resulting stream-static join and stored in a table. To avo
Writing to Delta might not support the write.option method. We set
spark.hadoop.parquet.block.size in our spark config for writing to Delta.
Adam
On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho
wrote:
> I tried re-writing the table with the updated block size but it doesn't
> appear to have an ef
I tried re-writing the table with the updated block size but it doesn't
appear to have an effect on the row group size.
```pyspark
df = spark.read.format("delta").load("/path/to/source1")
df.write \
.format("delta") \
.mode("overwrite") \
.options(**{
"parquet.block.size": "1m",
}) \
.partition
It should just be parquet.block.size indeed.
spark.write.option("parquet.block.size", "16m").parquet(...)
This is an issue in how you write, not read, the parquet.
On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho
wrote:
> Hi Adam,
>
> Thanks for the explanation on the empty partitions.
>
> We have
Hi Adam,
Thanks for the explanation on the empty partitions.
We have the freedom to adjust how the source table is written, so if there
are any improvements we can implement on the source side we'd be happy to
look into that.
It's not yet clear to me how you can reduce the row group size of the
The smallest unit of work you can do on a parquet file (under the delta
hood) is based on the parquet row group size, which by default is 128mb. If
you specify maxPartitionBytes of 10mb, what that will basically do is
create a partition for each 10mb of a file, but whatever partition covers
the par
Hello,
We have a spark structured streaming job that includes a stream-static join
and a Pandas UDF, streaming to/from delta tables. The primary key of the
static table is non-unique, meaning that the streaming join results in
multiple records per input record - in our case 100x increase. The Pand
10 matches
Mail list logo