[ https://issues.apache.org/jira/browse/SPARK-53634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuki Sato updated SPARK-53634: ------------------------------ Description: I am reporting this as a bug, but as I cannot determine for sure, I would like to confirm whether this behavior is intended or a bug. *issue* In certain logic, during the execution plan optimization process, some columns specified in the Sort node created by SortWithinPartition are missing. The following code snippet can easily reproduce this issue: {code:java} val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) val csvOutputDF1 = sampleData1.toDF("value", "path1") val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) val csvOutputDF2 = sampleData2.toDF("value", "path1") val csvUnionDF = csvOutputDF1.union(csvOutputDF2) csvUnionDF .withColumn("path2", lit("e")) .withColumn("path3", lit("f")) .sortWithinPartitions( col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc ) .explain(true) {code} The execution plan of the above code is as follows: {code:java} == Parsed Logical Plan == 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS FIRST, 'value ASC NULLS FIRST], false +- Project [value#42, path1#43, path2#59, f AS path3#63] +- Project [value#42, path1#43, e AS path2#59] +- Union false, false :- Project [_1#37 AS value#42, _2#38 AS path1#43] : +- LocalRelation [_1#37, _2#38] +- Project [_1#48 AS value#53, _2#49 AS path1#54] +- LocalRelation [_1#48, _2#49] == Analyzed Logical Plan == value: int, path1: string, path2: string, path3: string Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false +- Project [value#42, path1#43, path2#59, f AS path3#63] +- Project [value#42, path1#43, e AS path2#59] +- Union false, false :- Project [_1#37 AS value#42, _2#38 AS path1#43] : +- LocalRelation [_1#37, _2#38] +- Project [_1#48 AS value#53, _2#49 AS path1#54] +- LocalRelation [_1#48, _2#49] == Optimized Logical Plan == Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false +- Union false, false :- LocalRelation [value#42, path1#43, path2#59, path3#63] +- LocalRelation [value#53, path1#54, path2#68, path3#69] == Physical Plan == *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false, 0 +- Union :- LocalTableScan [value#42, path1#43, path2#59, path3#63] +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code} It can be observed that the column "path3" in Sort has already disappeared at the Optimized Logical Plan. This becomes a problem when using partitionBy as shown below: {code:java} import org.apache.spark.sql.SaveMode val publishDir = "output/csv_data" val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) val csvOutputDF1 = sampleData1.toDF("value", "path1") val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) val csvOutputDF2 = sampleData2.toDF("value", "path1") val csvUnionDF = csvOutputDF1.union(csvOutputDF2) csvUnionDF .withColumn("path2", lit("e")) .withColumn("path3", lit("f")) .sortWithinPartitions( col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc ) .write .mode(SaveMode.Overwrite) .partitionBy( "path1", "path2", "path3" ) .csv(publishDir) {code} When using V1Writes, isOrderingMatch compares the outputOrdering generated by the Sort node from sortWithinPartition with the requiredOrdering from partitionBy. However, since "path3" is missing, the Sort node from partitionBy is selected, and the sort specified by sortWithinPartition is ignored, resulting in the user-specified sort order being broken{*}.{*} Although partitionBy itself does not guarantee that the sort order will be preserved, from the user’s perspective this behavior appears strange. *root cause* The root cause in this case lies in the three optimization rules (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the order in which they are applied. Currently, the rules are applied once each time in the order PushProjectionThroughUnion → FoldablePropagation → EliminateSorts. As a result, the literal column from the first withColumn in csvUnionDF is not subject to FoldablePropagation due to the prior application of PushProjectionThroughUnion, while literal columns added from the second withColumn are subject to FoldablePropagation, interpreted as literals, and then eliminated by EliminateSorts. *affect version* I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all Spark versions are affected. However, the behavior differs around version 3.4.0, where changes to V1Writes were introduced. - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true) In the execution plan, the Sort generated by partitionBy is selected, and the Sort from sortWithinPartition is ignored. - Prior to 3.4.0 (same as spark.sql.optimizer.plannedWrite.enbaled=false in 3.4.0 and later) A Sort generated by partitionBy is internally added to the execution plan. As a result, in the example code above, both the Sort from sortWithinPartition and the Sort from partitionBy are executed redundantly. I am relatively new to Spark community, so I cannot determine whether this is intended behavior or a bug that should be fixed. If it is to be fixed as a bug and necessary, I will consider working on it. was: I am reporting this as a bug, but as I cannot determine for sure, I would like to confirm whether this behavior is intended or a bug. *issue* In certain logic, during the execution plan optimization process, some columns specified in the Sort node created by SortWithinPartition are missing. The following code snippet can easily reproduce this issue: {code:java} val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) val csvOutputDF1 = sampleData1.toDF("value", "path1") val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) val csvOutputDF2 = sampleData2.toDF("value", "path1") val csvUnionDF = csvOutputDF1.union(csvOutputDF2) csvUnionDF .withColumn("path2", lit("e")) .withColumn("path3", lit("f")) .sortWithinPartitions( col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc ) .explain(true) {code} The execution plan of the above code is as follows: {code:java} == Parsed Logical Plan == 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS FIRST, 'value ASC NULLS FIRST], false +- Project [value#42, path1#43, path2#59, f AS path3#63] +- Project [value#42, path1#43, e AS path2#59] +- Union false, false :- Project [_1#37 AS value#42, _2#38 AS path1#43] : +- LocalRelation [_1#37, _2#38] +- Project [_1#48 AS value#53, _2#49 AS path1#54] +- LocalRelation [_1#48, _2#49] == Analyzed Logical Plan == value: int, path1: string, path2: string, path3: string Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false +- Project [value#42, path1#43, path2#59, f AS path3#63] +- Project [value#42, path1#43, e AS path2#59] +- Union false, false :- Project [_1#37 AS value#42, _2#38 AS path1#43] : +- LocalRelation [_1#37, _2#38] +- Project [_1#48 AS value#53, _2#49 AS path1#54] +- LocalRelation [_1#48, _2#49] == Optimized Logical Plan == Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false +- Union false, false :- LocalRelation [value#42, path1#43, path2#59, path3#63] +- LocalRelation [value#53, path1#54, path2#68, path3#69] == Physical Plan == *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS FIRST], false, 0 +- Union :- LocalTableScan [value#42, path1#43, path2#59, path3#63] +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code} It can be observed that the column "path3" in Sort has already disappeared at the Optimized Logical Plan. This becomes a problem when using partitionBy as shown below: {code:java} import org.apache.spark.sql.SaveMode val publishDir = "output/csv_data" val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) val csvOutputDF1 = sampleData1.toDF("value", "path1") val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) val csvOutputDF2 = sampleData2.toDF("value", "path1") val csvUnionDF = csvOutputDF1.union(csvOutputDF2) csvUnionDF .withColumn("path2", lit("e")) .withColumn("path3", lit("f")) .sortWithinPartitions( col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc ) .write .mode(SaveMode.Overwrite) .partitionBy( "path1", "path2", "path3" ) .csv(publishDir) {code} When using V1Writes, isOrderingMatch compares the outputOrdering generated by the Sort node from sortWithinPartition with the requiredOrdering from partitionBy. However, since "path3" is missing, the Sort node from partitionBy is selected, and the sort specified by sortWithinPartition is ignored, resulting in the user-specified sort order being broken{*}.{*} Although partitionBy itself does not guarantee that the sort order will be preserved, from the user’s perspective this behavior appears strange. *root cause* The root cause in this case lies in the three optimization rules (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the order in which they are applied. Currently, the rules are applied once each time in the order PushProjectionThroughUnion → FoldablePropagation → EliminateSorts. As a result, the literal column from the first withColumn in csvUnionDF is not subject to FoldablePropagation due to the prior application of PushProjectionThroughUnion, while literal columns added from the second withColumn are subject to FoldablePropagation, interpreted as literals, and then eliminated by EliminateSorts. *affect version* I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all Spark versions are affected. However, the behavior differs around version 3.4.0, where changes to V1Writes were introduced. - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true) In the execution plan, the Sort generated by partitionBy is selected, and the Sort from sortWithinPartition is ignored. - Prior to 3.4.0 (spark.sql.optimizer.plannedWrite.enbaled=false) A Sort generated by partitionBy is internally added to the execution plan. As a result, in the example code above, both the Sort from sortWithinPartition and the Sort from partitionBy are executed redundantly. I am relatively new to Spark community, so I cannot determine whether this is intended behavior or a bug that should be fixed. If it is to be fixed as a bug and necessary, I will consider working on it. > Missing Sort Columns in Execution Plan Optimization with union + withColumn + > sortWithinPartition > ------------------------------------------------------------------------------------------------- > > Key: SPARK-53634 > URL: https://issues.apache.org/jira/browse/SPARK-53634 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 3.5.4, 4.0.0 > Reporter: Yuki Sato > Priority: Major > > I am reporting this as a bug, but as I cannot determine for sure, I would > like to confirm whether this behavior is intended or a bug. > *issue* > In certain logic, during the execution plan optimization process, some > columns specified in the Sort node created by SortWithinPartition are missing. > The following code snippet can easily reproduce this issue: > {code:java} > val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) > val csvOutputDF1 = sampleData1.toDF("value", "path1") > val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) > val csvOutputDF2 = sampleData2.toDF("value", "path1") > val csvUnionDF = csvOutputDF1.union(csvOutputDF2) > csvUnionDF > .withColumn("path2", lit("e")) > .withColumn("path3", lit("f")) > .sortWithinPartitions( > col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc > ) > .explain(true) {code} > > The execution plan of the above code is as follows: > {code:java} > == Parsed Logical Plan == > 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS > FIRST, 'value ASC NULLS FIRST], false > +- Project [value#42, path1#43, path2#59, f AS path3#63] > +- Project [value#42, path1#43, e AS path2#59] > +- Union false, false > :- Project [_1#37 AS value#42, _2#38 AS path1#43] > : +- LocalRelation [_1#37, _2#38] > +- Project [_1#48 AS value#53, _2#49 AS path1#54] > +- LocalRelation [_1#48, _2#49] > == Analyzed Logical Plan == > value: int, path1: string, path2: string, path3: string > Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS > FIRST, value#42 ASC NULLS FIRST], false > +- Project [value#42, path1#43, path2#59, f AS path3#63] > +- Project [value#42, path1#43, e AS path2#59] > +- Union false, false > :- Project [_1#37 AS value#42, _2#38 AS path1#43] > : +- LocalRelation [_1#37, _2#38] > +- Project [_1#48 AS value#53, _2#49 AS path1#54] > +- LocalRelation [_1#48, _2#49] > == Optimized Logical Plan == > Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS > FIRST], false > +- Union false, false > :- LocalRelation [value#42, path1#43, path2#59, path3#63] > +- LocalRelation [value#53, path1#54, path2#68, path3#69] > == Physical Plan == > *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC > NULLS FIRST], false, 0 > +- Union > :- LocalTableScan [value#42, path1#43, path2#59, path3#63] > +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code} > It can be observed that the column "path3" in Sort has already disappeared at > the Optimized Logical Plan. > This becomes a problem when using partitionBy as shown below: > {code:java} > import org.apache.spark.sql.SaveMode > val publishDir = "output/csv_data" > val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) > val csvOutputDF1 = sampleData1.toDF("value", "path1") > val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) > val csvOutputDF2 = sampleData2.toDF("value", "path1") > val csvUnionDF = csvOutputDF1.union(csvOutputDF2) > csvUnionDF > .withColumn("path2", lit("e")) > .withColumn("path3", lit("f")) > .sortWithinPartitions( > col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc > ) > .write > .mode(SaveMode.Overwrite) > .partitionBy( > "path1", "path2", "path3" > ) > .csv(publishDir) {code} > When using V1Writes, isOrderingMatch compares the outputOrdering generated by > the Sort node from sortWithinPartition with the requiredOrdering from > partitionBy. > However, since "path3" is missing, the Sort node from partitionBy is > selected, and the sort specified by sortWithinPartition is ignored, resulting > in the user-specified sort order being broken{*}.{*} > Although partitionBy itself does not guarantee that the sort order will be > preserved, from the user’s perspective this behavior appears strange. > *root cause* > The root cause in this case lies in the three optimization rules > (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the > order in which they are applied. > Currently, the rules are applied once each time in the order > PushProjectionThroughUnion → FoldablePropagation → EliminateSorts. > As a result, the literal column from the first withColumn in csvUnionDF is > not subject to FoldablePropagation due to the prior application of > PushProjectionThroughUnion, while literal columns added from the second > withColumn are subject to FoldablePropagation, interpreted as literals, and > then eliminated by EliminateSorts. > *affect version* > I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all > Spark versions are affected. > However, the behavior differs around version 3.4.0, where changes to V1Writes > were introduced. > - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true) > In the execution plan, the Sort generated by partitionBy is selected, and > the Sort from sortWithinPartition is ignored. > - Prior to 3.4.0 (same as spark.sql.optimizer.plannedWrite.enbaled=false in > 3.4.0 and later) > A Sort generated by partitionBy is internally added to the execution plan. > As a result, in the example code above, both the Sort from > sortWithinPartition and the Sort from partitionBy are executed redundantly. > I am relatively new to Spark community, so I cannot determine whether this is > intended behavior or a bug that should be fixed. > If it is to be fixed as a bug and necessary, I will consider working on it. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org