Hi haejoon,

I think you can check the discuss in
https://github.com/apache/spark/pull/27861

Best regards
Angers

Haejoon Lee <haejoon....@databricks.com> 于2021年8月30日周一 下午1:59写道:

> Hi all,
>
> I noticed that Spark uses only one partition when performing Window
> cumulative functions without specifying the partition, so all the dataset
> is moved into a single partition which easily causes OOM or serious
> performance degradation.
>
> See the example below:
>
> >>> from pyspark.sql import functions as F, Window
> >>> sdf = spark.range(10)
> >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding,
> >>>  Window.currentRow))).show()
> ...
> WARN WindowExec: No Partition Defined for Window operation! Moving all data 
> to a single partition, this can cause serious performance degradation.
> ...
> +---------------------------------------------------------------+
> |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
> +---------------------------------------------------------------+
> |                                                              0|
> |                                                              1|
> |                                                              3|
> |                                                              6|
> |                                                             10|
> |                                                             15|
> |                                                             21|
> |                                                             28|
> |                                                             36|
> |                                                             45|
> +---------------------------------------------------------------+
>
> As shown in the example, the window cumulative function requires the
> result of the previous operation to be used for the next operation. In
> Spark, it is calculated by simply moving all data to one partition if a
> partition is not specified.
>
> To overcome this, for example in Dask, they introduce the concept of 
> Overlapping
> Computations <https://docs.dask.org/en/latest/array-overlap.html>, which
> creates the copies of the entire dataset into multiple blocks and
> sequentially performs the cumulative function, when the dataset exceeds the
> memory size.
>
> Of course, this method requires more cost for creating the copies and
> communication of each block, but it allows performing cumulative functions
> when even the size of the dataset exceeds the size of the memory, rather
> than causing the OOM.
>
> So, it's the way to simply resolve the out-of-memory issue without any
> performance advantage, though.
>
> I think maybe this kind of use case is pretty common in data science, but
> I wonder how frequent these use cases are in Apache Spark.
>
> Would it be helpful to implement this way in Apache Spark when doing
> Window cumulative functions on out-of-memory data without specifying
> partition??
>
> Check here <https://github.com/databricks/koalas/issues/1386> where the
> issue was firstly initiated, for more detail.
>
>
> Best,
>
> Haejoon.
>

Reply via email to