Hi all,

I noticed that Spark uses only one partition when performing Window
cumulative functions without specifying the partition, so all the dataset
is moved into a single partition which easily causes OOM or serious
performance degradation.

See the example below:

>>> from pyspark.sql import functions as F, Window
>>> sdf = spark.range(10)
>>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding,
>>>  Window.currentRow))).show()
...
WARN WindowExec: No Partition Defined for Window operation! Moving all
data to a single partition, this can cause serious performance
degradation.
...
+---------------------------------------------------------------+
|sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+---------------------------------------------------------------+
|                                                              0|
|                                                              1|
|                                                              3|
|                                                              6|
|                                                             10|
|                                                             15|
|                                                             21|
|                                                             28|
|                                                             36|
|                                                             45|
+---------------------------------------------------------------+

As shown in the example, the window cumulative function requires the result
of the previous operation to be used for the next operation. In Spark, it
is calculated by simply moving all data to one partition if a partition is
not specified.

To overcome this, for example in Dask, they introduce the concept of
Overlapping
Computations <https://docs.dask.org/en/latest/array-overlap.html>, which
creates the copies of the entire dataset into multiple blocks and
sequentially performs the cumulative function, when the dataset exceeds the
memory size.

Of course, this method requires more cost for creating the copies and
communication of each block, but it allows performing cumulative functions
when even the size of the dataset exceeds the size of the memory, rather
than causing the OOM.

So, it's the way to simply resolve the out-of-memory issue without any
performance advantage, though.

I think maybe this kind of use case is pretty common in data science, but I
wonder how frequent these use cases are in Apache Spark.

Would it be helpful to implement this way in Apache Spark when doing Window
cumulative functions on out-of-memory data without specifying partition??

Check here <https://github.com/databricks/koalas/issues/1386> where the
issue was firstly initiated, for more detail.


Best,

Haejoon.

Reply via email to