Hi Rishi,

May be you have aready done these steps.
Can you check the size of the dataframe you are trying to broadcast using
logInfo(SizeEstimator.estimate(df))
and adjust the driver similarly.

There is one more issue which I found was in spark 2.
Broadcast does not work in cache data. It is possible this may not be the
issue. You can check at your end the same problem.

https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219

And can you pls tell what issue was solved in spark 3, which you are
referring.

Regards
Amit


On Saturday, September 19, 2020, Rishi Shah <rishishah.s...@gmail.com>
wrote:

> Thanks Amit. I have tried increasing driver memory , also tried increasing
> max result size returned to the driver. Nothing works, I believe spark is
> not able to determine the fact that the result to be broadcasted is small
> enough because input data is huge? When I tried this in 2 stages, write out
> the grouped data and use that to join using broadcast, spark has no issues
> broadcasting this.
>
> When I was checking Spark 3 documentation, it seems like this issue may
> have been addressed in Spark 3 but not in earlier version?
>
> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <mailtojoshia...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I think problem lies with driver memory. Broadcast in spark work by
>> collecting all the data to driver and then driver broadcasting to all the
>> executors. Different strategy could be employed for trasfer like bit
>> torrent though.
>>
>> Please try increasing the driver memory. See if it works.
>>
>> Regards,
>> Amit
>>
>>
>> On Thursday, September 17, 2020, Rishi Shah <rishishah.s...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>> snappy compressed), however I group it by a column and get a much smaller
>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>> When I use it like below to broadcast this aggregated result, it throws
>>> dataframe can not be broadcasted error.
>>>
>>> df_agg = df.groupBy('column1').count().cache()
>>> # df_agg.count()
>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>> df_join.write.parquet('PATH')
>>>
>>> The same code works with input df size of 3TB without any modifications.
>>>
>>> Any suggestions?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>
> --
> Regards,
>
> Rishi Shah
>

Reply via email to