You just have 1 partition here because the input is so small. You can always repartition this further for parallelism. Is the issue that you're not partitioning the window itself, maybe?
On Mon, Aug 30, 2021 at 12:59 AM Haejoon Lee <haejoon....@databricks.com> wrote: > Hi all, > > I noticed that Spark uses only one partition when performing Window > cumulative functions without specifying the partition, so all the dataset > is moved into a single partition which easily causes OOM or serious > performance degradation. > > See the example below: > > >>> from pyspark.sql import functions as F, Window > >>> sdf = spark.range(10) > >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, > >>> Window.currentRow))).show() > ... > WARN WindowExec: No Partition Defined for Window operation! Moving all data > to a single partition, this can cause serious performance degradation. > ... > +---------------------------------------------------------------+ > |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)| > +---------------------------------------------------------------+ > | 0| > | 1| > | 3| > | 6| > | 10| > | 15| > | 21| > | 28| > | 36| > | 45| > +---------------------------------------------------------------+ > > As shown in the example, the window cumulative function requires the > result of the previous operation to be used for the next operation. In > Spark, it is calculated by simply moving all data to one partition if a > partition is not specified. > > To overcome this, for example in Dask, they introduce the concept of > Overlapping > Computations <https://docs.dask.org/en/latest/array-overlap.html>, which > creates the copies of the entire dataset into multiple blocks and > sequentially performs the cumulative function, when the dataset exceeds the > memory size. > > Of course, this method requires more cost for creating the copies and > communication of each block, but it allows performing cumulative functions > when even the size of the dataset exceeds the size of the memory, rather > than causing the OOM. > > So, it's the way to simply resolve the out-of-memory issue without any > performance advantage, though. > > I think maybe this kind of use case is pretty common in data science, but > I wonder how frequent these use cases are in Apache Spark. > > Would it be helpful to implement this way in Apache Spark when doing > Window cumulative functions on out-of-memory data without specifying > partition?? > > Check here <https://github.com/databricks/koalas/issues/1386> where the > issue was firstly initiated, for more detail. > > > Best, > > Haejoon. >