Shardul Mahadik created SPARK-51582:
---------------------------------------

             Summary: Improve codegen for ExpandExec with high number of 
projections
                 Key: SPARK-51582
                 URL: https://issues.apache.org/jira/browse/SPARK-51582
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.5.5
            Reporter: Shardul Mahadik


Currently, ExpandExec uses a [switch-case based 
codegen|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L118]
 mechanism where the amount of code generated is proportional to (N * O) where 
N is the number of rows produced per input row and O is the number of output 
columns. For a cubing operation (or other similar grouping sets operations), 
which seems to be the main use case for Expand, the number of output rows per 
input row N = 2^D, where D is the number of cubing dimensions. We find that the 
current codegen for ExpandExec fails for as low as D = 10, because the 
generated code hits the 64KB JVM limit for bytecode per method. SPARK-35329 
tries to improve on this by moving the contents of each case block to its own 
method, but the code within the _consume_ method is still proportional to N 
(which can be much much higher than O) and fails for as low as D = 12. This 
unfortunately is not sufficient for some for our use cases where D can go as 
high as 16.

Instead of generating switch-cases, we can do the following:
 # Find all the unique individual expressions across [the 
projections|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L37].
 For most use cases, this number is much smaller than (N * O). E.g. For the 
cubing operation, the number of individual expressions is (N + O), O since 1 
expr per every output column (dimension and values) and N since Spark assigns 
literals 1…N to each row of the Expand output. Consider
{code:java}
projections = [ [k1, k2, k3, val1, 1],
                [null, k2, k3, val1, 2],
                [k1, null, k3, val1, 3],
                .
                .
                [null, null, null, val1, 8] ]
{code}
then
{code:java}
uniqueExprs = [null, k1, k2, k3, val1, 1, 2, … N]
{code}

 # Create an expression map which follows the same structure as projections, 
but replaces expressions with their index in uniqueExprs
{code:java}
exprMap = [ [1, 2, 3, 4, 5],
            [0, 2, 3, 4, 6],
            [1, 0, 3, 4, 7],
            .
            .
            [0, 0, 0, 4, 12] ]
{code}

 # Calculate the value of all the unique expressions once per input row.
{code:java}
exprOutputs = [ eval(null), eval(k1), eval(k2), ….. ]
{code}

 # For every projection, for every output column, assign the value by looking 
up the corresponding value in exprOutputs using the index from exprMap

Pseudocode:
{code:java}
val uniqueExprs = distinct on projections.flatten
val exprMap = for each expr in projections, find its index in uniqueExprs

do_consume():
        val exprOutputs = evaluate all exprs in uniqueExprs on input row

        for (i : projections.indices) {
                // find value of output column by fetching the index from 
exprMap
                // and using the index to offset into exprOutputs
                index = exprMap[i][0]
                output[0] = exprOutputs[index]

                index = exprMap[i][1]
                output[1] = exprOutputs[index]
                .
                .
                .
   }

{code}
With this approach, steps 1 and 2 are performed during codegen and their 
outputs are added to reference objects that can be accessed during code 
execution. Also, we can further optimize step 3 by evaluating literals in 
uniqueExprs immediately during codegen and only generating code for evaluating 
non-literal expressions in step 3. In this case, the amount of code generated 
will be roughly proportional (2 * O) (one O for step 3 assuming one expression 
per output column and another O for step 4).

Performance:

Some initial benchmarking suggests that this approach is ~25% slower than 
switch cases when D is small (D <= 8). I am assuming that this is due to the 
cost of additional array accesses as compared to switch cases. For D > 8, the 
proposed approach is around 2x faster (this needs more analysis to understand 
why the perf of switch cases falls drastically at D > 8). For D > 11, the 
switch case approach fails entirely due to the JVM method size limit, but the 
proposed approach continues to scale with performance proportional to 2^D.

I wanted to get some thoughts on whether this approach is reasonable and is 
something that can be added into Spark. We can choose to only use this approach 
for cases with large number of projections. The code needs some cleanup and 
more testing before I can raise a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to