Shardul Mahadik created SPARK-51582: ---------------------------------------
Summary: Improve codegen for ExpandExec with high number of projections Key: SPARK-51582 URL: https://issues.apache.org/jira/browse/SPARK-51582 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.5 Reporter: Shardul Mahadik Currently, ExpandExec uses a [switch-case based codegen|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L118] mechanism where the amount of code generated is proportional to (N * O) where N is the number of rows produced per input row and O is the number of output columns. For a cubing operation (or other similar grouping sets operations), which seems to be the main use case for Expand, the number of output rows per input row N = 2^D, where D is the number of cubing dimensions. We find that the current codegen for ExpandExec fails for as low as D = 10, because the generated code hits the 64KB JVM limit for bytecode per method. SPARK-35329 tries to improve on this by moving the contents of each case block to its own method, but the code within the _consume_ method is still proportional to N (which can be much much higher than O) and fails for as low as D = 12. This unfortunately is not sufficient for some for our use cases where D can go as high as 16. Instead of generating switch-cases, we can do the following: # Find all the unique individual expressions across [the projections|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L37]. For most use cases, this number is much smaller than (N * O). E.g. For the cubing operation, the number of individual expressions is (N + O), O since 1 expr per every output column (dimension and values) and N since Spark assigns literals 1…N to each row of the Expand output. Consider {code:java} projections = [ [k1, k2, k3, val1, 1], [null, k2, k3, val1, 2], [k1, null, k3, val1, 3], . . [null, null, null, val1, 8] ] {code} then {code:java} uniqueExprs = [null, k1, k2, k3, val1, 1, 2, … N] {code} # Create an expression map which follows the same structure as projections, but replaces expressions with their index in uniqueExprs {code:java} exprMap = [ [1, 2, 3, 4, 5], [0, 2, 3, 4, 6], [1, 0, 3, 4, 7], . . [0, 0, 0, 4, 12] ] {code} # Calculate the value of all the unique expressions once per input row. {code:java} exprOutputs = [ eval(null), eval(k1), eval(k2), ….. ] {code} # For every projection, for every output column, assign the value by looking up the corresponding value in exprOutputs using the index from exprMap Pseudocode: {code:java} val uniqueExprs = distinct on projections.flatten val exprMap = for each expr in projections, find its index in uniqueExprs do_consume(): val exprOutputs = evaluate all exprs in uniqueExprs on input row for (i : projections.indices) { // find value of output column by fetching the index from exprMap // and using the index to offset into exprOutputs index = exprMap[i][0] output[0] = exprOutputs[index] index = exprMap[i][1] output[1] = exprOutputs[index] . . . } {code} With this approach, steps 1 and 2 are performed during codegen and their outputs are added to reference objects that can be accessed during code execution. Also, we can further optimize step 3 by evaluating literals in uniqueExprs immediately during codegen and only generating code for evaluating non-literal expressions in step 3. In this case, the amount of code generated will be roughly proportional (2 * O) (one O for step 3 assuming one expression per output column and another O for step 4). Performance: Some initial benchmarking suggests that this approach is ~25% slower than switch cases when D is small (D <= 8). I am assuming that this is due to the cost of additional array accesses as compared to switch cases. For D > 8, the proposed approach is around 2x faster (this needs more analysis to understand why the perf of switch cases falls drastically at D > 8). For D > 11, the switch case approach fails entirely due to the JVM method size limit, but the proposed approach continues to scale with performance proportional to 2^D. I wanted to get some thoughts on whether this approach is reasonable and is something that can be added into Spark. We can choose to only use this approach for cases with large number of projections. The code needs some cleanup and more testing before I can raise a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org