[ 
https://issues.apache.org/jira/browse/SPARK-51738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoyu Weng updated SPARK-51738:
-------------------------------
    Description: 
First bad commit: 01adf10405f from [[SPARK-50256][SQL] Add lightweight 
validation to check if a logical plan becomes unresolved after every optimizer 
rule|https://github.com/apache/spark/pull/48787]

Repro:
{code:sql}
select foo in (select struct(1 a)) from (select struct(1 b) foo)
{code}
Before bad commit:
{code:java}
+----------------------+
|(foo IN (listquery()))|
+----------------------+
|                  true|
+----------------------+
{code}
Since bad commit:
{code:java}
org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule 
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch 
RewriteSubquery generated an invalid plan: The plan was previously resolved and 
now became unresolved. SQLSTATE: XXKD0
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:260)
        at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:233)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:225)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:225)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:195)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:195)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$2(QueryExecution.scala:179)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:255)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:631)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:255)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:254)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$1(QueryExecution.scala:175)
        at scala.util.Try$.apply(Try.scala:217)
        at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1375)
        at 
org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1429)
        at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:225)
        at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:274)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:321)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:289)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:147)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:264)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:123)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:287)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:229)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2165)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:1336)
        at org.apache.spark.sql.api.Dataset.take(Dataset.scala:2705)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:307)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:343)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:833)
{code}

  was:
First bad commit: 01adf10405f from https://github.com/apache/spark/pull/48787

Repro:
{code:sql}
select foo in (select struct(1 a)) from (select struct(1 b) foo)
{code}
Before bad commit:
{code:java}
+----------------------+
|(foo IN (listquery()))|
+----------------------+
|                  true|
+----------------------+
{code}
Since bad commit:
{code:java}
org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule 
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch 
RewriteSubquery generated an invalid plan: The plan was previously resolved and 
now became unresolved. SQLSTATE: XXKD0
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:260)
        at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:233)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:225)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:225)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:195)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:195)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$2(QueryExecution.scala:179)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:255)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:631)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:255)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:254)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$1(QueryExecution.scala:175)
        at scala.util.Try$.apply(Try.scala:217)
        at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1375)
        at 
org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1429)
        at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:225)
        at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:274)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:321)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:289)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:147)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:264)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:123)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:287)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:229)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2165)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:1336)
        at org.apache.spark.sql.api.Dataset.take(Dataset.scala:2705)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:307)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:343)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:833)
{code}


> IN subquery with struct of mismatching field names fails with 
> PLAN_VALIDATION_FAILED_RULE_IN_BATCH
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-51738
>                 URL: https://issues.apache.org/jira/browse/SPARK-51738
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Haoyu Weng
>            Priority: Major
>
> First bad commit: 01adf10405f from [[SPARK-50256][SQL] Add lightweight 
> validation to check if a logical plan becomes unresolved after every 
> optimizer rule|https://github.com/apache/spark/pull/48787]
> Repro:
> {code:sql}
> select foo in (select struct(1 a)) from (select struct(1 b) foo)
> {code}
> Before bad commit:
> {code:java}
> +----------------------+
> |(foo IN (listquery()))|
> +----------------------+
> |                  true|
> +----------------------+
> {code}
> Since bad commit:
> {code:java}
> org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule 
> org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch 
> RewriteSubquery generated an invalid plan: The plan was previously resolved 
> and now became unresolved. SQLSTATE: XXKD0
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:260)
>       at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:233)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:225)
>       at scala.collection.immutable.List.foreach(List.scala:334)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:225)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:195)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:195)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$2(QueryExecution.scala:179)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:255)
>       at 
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:631)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:255)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:254)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyOptimizedPlan$1(QueryExecution.scala:175)
>       at scala.util.Try$.apply(Try.scala:217)
>       at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1375)
>       at 
> org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1429)
>       at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:225)
>       at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:274)
>       at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:321)
>       at 
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:289)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:147)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:264)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:123)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:287)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:752)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:229)
>       at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2165)
>       at org.apache.spark.sql.Dataset.head(Dataset.scala:1336)
>       at org.apache.spark.sql.api.Dataset.take(Dataset.scala:2705)
>       at org.apache.spark.sql.Dataset.getRows(Dataset.scala:307)
>       at org.apache.spark.sql.Dataset.showString(Dataset.scala:343)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>       at py4j.Gateway.invoke(Gateway.java:282)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
>       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
>       at java.base/java.lang.Thread.run(Thread.java:833)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to