Hi haejoon, I think you can check the discuss in https://github.com/apache/spark/pull/27861
Best regards Angers Haejoon Lee <haejoon....@databricks.com> 于2021年8月30日周一 下午1:59写道: > Hi all, > > I noticed that Spark uses only one partition when performing Window > cumulative functions without specifying the partition, so all the dataset > is moved into a single partition which easily causes OOM or serious > performance degradation. > > See the example below: > > >>> from pyspark.sql import functions as F, Window > >>> sdf = spark.range(10) > >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, > >>> Window.currentRow))).show() > ... > WARN WindowExec: No Partition Defined for Window operation! Moving all data > to a single partition, this can cause serious performance degradation. > ... > +---------------------------------------------------------------+ > |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)| > +---------------------------------------------------------------+ > | 0| > | 1| > | 3| > | 6| > | 10| > | 15| > | 21| > | 28| > | 36| > | 45| > +---------------------------------------------------------------+ > > As shown in the example, the window cumulative function requires the > result of the previous operation to be used for the next operation. In > Spark, it is calculated by simply moving all data to one partition if a > partition is not specified. > > To overcome this, for example in Dask, they introduce the concept of > Overlapping > Computations <https://docs.dask.org/en/latest/array-overlap.html>, which > creates the copies of the entire dataset into multiple blocks and > sequentially performs the cumulative function, when the dataset exceeds the > memory size. > > Of course, this method requires more cost for creating the copies and > communication of each block, but it allows performing cumulative functions > when even the size of the dataset exceeds the size of the memory, rather > than causing the OOM. > > So, it's the way to simply resolve the out-of-memory issue without any > performance advantage, though. > > I think maybe this kind of use case is pretty common in data science, but > I wonder how frequent these use cases are in Apache Spark. > > Would it be helpful to implement this way in Apache Spark when doing > Window cumulative functions on out-of-memory data without specifying > partition?? > > Check here <https://github.com/databricks/koalas/issues/1386> where the > issue was firstly initiated, for more detail. > > > Best, > > Haejoon. >