Hi all, I noticed that Spark uses only one partition when performing Window cumulative functions without specifying the partition, so all the dataset is moved into a single partition which easily causes OOM or serious performance degradation.
See the example below: >>> from pyspark.sql import functions as F, Window >>> sdf = spark.range(10) >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, >>> Window.currentRow))).show() ... WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. ... +---------------------------------------------------------------+ |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)| +---------------------------------------------------------------+ | 0| | 1| | 3| | 6| | 10| | 15| | 21| | 28| | 36| | 45| +---------------------------------------------------------------+ As shown in the example, the window cumulative function requires the result of the previous operation to be used for the next operation. In Spark, it is calculated by simply moving all data to one partition if a partition is not specified. To overcome this, for example in Dask, they introduce the concept of Overlapping Computations <https://docs.dask.org/en/latest/array-overlap.html>, which creates the copies of the entire dataset into multiple blocks and sequentially performs the cumulative function, when the dataset exceeds the memory size. Of course, this method requires more cost for creating the copies and communication of each block, but it allows performing cumulative functions when even the size of the dataset exceeds the size of the memory, rather than causing the OOM. So, it's the way to simply resolve the out-of-memory issue without any performance advantage, though. I think maybe this kind of use case is pretty common in data science, but I wonder how frequent these use cases are in Apache Spark. Would it be helpful to implement this way in Apache Spark when doing Window cumulative functions on out-of-memory data without specifying partition?? Check here <https://github.com/databricks/koalas/issues/1386> where the issue was firstly initiated, for more detail. Best, Haejoon.