MaxGekk commented on code in PR #49658:
URL: https://github.com/apache/spark/pull/49658#discussion_r1935368749


##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ViewResolverSuite.scala:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.analysis.resolver
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{MetadataResolver, 
Resolver}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{
+  LocalRelation,
+  LogicalPlan,
+  OneRowRelation,
+  Project,
+  SubqueryAlias,
+  View
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class ViewResolverSuite extends QueryTest with SharedSparkSession {
+  private val catalogName =
+    "spark_catalog"
+  private val col1Integer =
+    AttributeReference(name = "col1", dataType = IntegerType, nullable = 
false)()
+  private val col2String =
+    AttributeReference(name = "col2", dataType = StringType, nullable = 
false)()
+
+  test("Temporary view") {
+    withView("temporary_view") {
+      spark.sql("CREATE TEMPORARY VIEW temporary_view AS SELECT col1, col2 
FROM VALUES (1, 'a');")
+
+      checkViewResolution(
+        "SELECT * FROM temporary_view",
+        expectedChild = Project(
+          projectList = Seq(
+            Alias(Cast(col1Integer, 
IntegerType).withTimeZone(conf.sessionLocalTimeZone), "col1")(),
+            Alias(Cast(col2String, 
StringType).withTimeZone(conf.sessionLocalTimeZone), "col2")()
+          ),
+          child = Project(
+            projectList = Seq(col1Integer, col2String),
+            child = LocalRelation(
+              output = Seq(col1Integer, col2String),
+              data = Seq(
+                InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst(_)))

Review Comment:
   ```suggestion
                   InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst))
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala:
##########
@@ -103,263 +104,113 @@ class NameScopeSuite extends PlanTest with 
SQLConfHelper {
   test("Empty scope") {
     val nameScope = new NameScope
 
-    assert(nameScope.getAllAttributes.isEmpty)
+    assert(nameScope.output.isEmpty)
 
-    assert(nameScope.matchMultipartName(Seq("col1")) == NameTarget(candidates 
= Seq.empty))
-  }
-
-  test("Single unnamed plan") {
-    val nameScope = new NameScope
-
-    nameScope += Seq(col1Integer, col2Integer, col3Boolean)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean))
-
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col1",
+      candidates = Seq.empty
     )
   }
 
-  test("Several unnamed plans") {
-    val nameScope = new NameScope
-
-    nameScope += Seq(col1Integer)
-    nameScope += Seq(col2Integer, col3Boolean)
-    nameScope += Seq(col4String)
+  test("Distinct attributes") {
+    val nameScope = new NameScope(Seq(col1Integer, col2Integer, col3Boolean, 
col4String))
 
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean, col4String))
+    assert(nameScope.output == Seq(col1Integer, col2Integer, col3Boolean, 
col4String))
 
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col1",
+      candidates = Seq(col1Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq(col4String),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col5")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-  }
-
-  test("Single named plan") {
-    val nameScope = new NameScope
-
-    nameScope("table1") = Seq(col1Integer, col2Integer, col3Boolean)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean))
-
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col2",
+      candidates = Seq(col2Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col3",
+      candidates = Seq(col3Boolean)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col4",
+      candidates = Seq(col4String)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col5",
+      candidates = Seq.empty
     )
   }
 
-  test("Several named plans") {
-    val nameScope = new NameScope
+  test("Duplicate attribute names") {
+    val nameScope = new NameScope(Seq(col1Integer, col1Integer, 
col1IntegerOther))
 
-    nameScope("table1") = Seq(col1Integer)
-    nameScope("table2") = Seq(col2Integer, col3Boolean)
-    nameScope("table2") = Seq(col4String)
-    nameScope("table3") = Seq(col5String)

Review Comment:
   Is this:
   ```
     private val col5String = AttributeReference(name = "col5", dataType = 
StringType)()
   ```
   used somewhere after the deletion?



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ViewResolverSuite.scala:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.analysis.resolver
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{MetadataResolver, 
Resolver}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{
+  LocalRelation,
+  LogicalPlan,
+  OneRowRelation,
+  Project,
+  SubqueryAlias,
+  View
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class ViewResolverSuite extends QueryTest with SharedSparkSession {
+  private val catalogName =
+    "spark_catalog"
+  private val col1Integer =
+    AttributeReference(name = "col1", dataType = IntegerType, nullable = 
false)()
+  private val col2String =
+    AttributeReference(name = "col2", dataType = StringType, nullable = 
false)()
+
+  test("Temporary view") {
+    withView("temporary_view") {
+      spark.sql("CREATE TEMPORARY VIEW temporary_view AS SELECT col1, col2 
FROM VALUES (1, 'a');")
+
+      checkViewResolution(
+        "SELECT * FROM temporary_view",
+        expectedChild = Project(
+          projectList = Seq(
+            Alias(Cast(col1Integer, 
IntegerType).withTimeZone(conf.sessionLocalTimeZone), "col1")(),
+            Alias(Cast(col2String, 
StringType).withTimeZone(conf.sessionLocalTimeZone), "col2")()
+          ),
+          child = Project(
+            projectList = Seq(col1Integer, col2String),
+            child = LocalRelation(
+              output = Seq(col1Integer, col2String),
+              data = Seq(
+                InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst(_)))
+              )
+            )
+          )
+        )
+      )
+    }
+  }
+
+  test("Persistent view") {
+    withView("persistent_view") {
+      spark.sql("CREATE VIEW persistent_view AS SELECT col1, col2 FROM VALUES 
(1, 'a');")
+
+      checkViewResolution(
+        "SELECT * FROM persistent_view",
+        expectedChild = Project(
+          projectList = Seq(
+            Alias(Cast(col1Integer, 
IntegerType).withTimeZone(conf.sessionLocalTimeZone), "col1")(),
+            Alias(Cast(col2String, 
StringType).withTimeZone(conf.sessionLocalTimeZone), "col2")()
+          ),
+          child = Project(
+            projectList = Seq(col1Integer, col2String),
+            child = LocalRelation(
+              output = Seq(col1Integer, col2String),
+              data = Seq(
+                InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst(_)))

Review Comment:
   ```suggestion
                   InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst))
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala:
##########
@@ -103,263 +104,113 @@ class NameScopeSuite extends PlanTest with 
SQLConfHelper {
   test("Empty scope") {
     val nameScope = new NameScope
 
-    assert(nameScope.getAllAttributes.isEmpty)
+    assert(nameScope.output.isEmpty)
 
-    assert(nameScope.matchMultipartName(Seq("col1")) == NameTarget(candidates 
= Seq.empty))
-  }
-
-  test("Single unnamed plan") {
-    val nameScope = new NameScope
-
-    nameScope += Seq(col1Integer, col2Integer, col3Boolean)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean))
-
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col1",
+      candidates = Seq.empty
     )
   }
 
-  test("Several unnamed plans") {
-    val nameScope = new NameScope
-
-    nameScope += Seq(col1Integer)
-    nameScope += Seq(col2Integer, col3Boolean)
-    nameScope += Seq(col4String)
+  test("Distinct attributes") {
+    val nameScope = new NameScope(Seq(col1Integer, col2Integer, col3Boolean, 
col4String))
 
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean, col4String))
+    assert(nameScope.output == Seq(col1Integer, col2Integer, col3Boolean, 
col4String))
 
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col1",
+      candidates = Seq(col1Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq(col4String),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col5")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
-    )
-  }
-
-  test("Single named plan") {
-    val nameScope = new NameScope
-
-    nameScope("table1") = Seq(col1Integer, col2Integer, col3Boolean)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean))
-
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col2",
+      candidates = Seq(col2Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col3",
+      candidates = Seq(col3Boolean)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col4",
+      candidates = Seq(col4String)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col5",
+      candidates = Seq.empty
     )
   }
 
-  test("Several named plans") {
-    val nameScope = new NameScope
+  test("Duplicate attribute names") {
+    val nameScope = new NameScope(Seq(col1Integer, col1Integer, 
col1IntegerOther))
 
-    nameScope("table1") = Seq(col1Integer)
-    nameScope("table2") = Seq(col2Integer, col3Boolean)
-    nameScope("table2") = Seq(col4String)
-    nameScope("table3") = Seq(col5String)
-
-    assert(
-      nameScope.getAllAttributes == Seq(
-        col1Integer,
-        col2Integer,
-        col3Boolean,
-        col4String,
-        col5String
-      )
-    )
+    assert(nameScope.output == Seq(col1Integer, col1Integer, col1IntegerOther))
 
-    assert(
-      nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-        candidates = Seq(col1Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col2")) == NameTarget(
-        candidates = Seq(col2Integer),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col4")) == NameTarget(
-        candidates = Seq(col4String),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col5")) == NameTarget(
-        candidates = Seq(col5String),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
-    )
-    assert(
-      nameScope.matchMultipartName(Seq("col6")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String, 
col5String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col1",
+      candidates = Seq(col1Integer, col1Integer, col1IntegerOther)
     )
   }
 
-  test("Named and unnamed plans with case insensitive comparison") {
+  test("Case insensitive comparison") {
     val col1Integer = AttributeReference(name = "Col1", dataType = 
IntegerType)()
     val col2Integer = AttributeReference(name = "col2", dataType = 
IntegerType)()
     val col3Boolean = AttributeReference(name = "coL3", dataType = 
BooleanType)()
+    val col3BooleanOther = AttributeReference(name = "Col3", dataType = 
BooleanType)()
     val col4String = AttributeReference(name = "Col4", dataType = StringType)()
 
-    val nameScope = new NameScope
-
-    nameScope("TaBle1") = Seq(col1Integer)
-    nameScope("table2") = Seq(col2Integer, col3Boolean)
-    nameScope += Seq(col4String)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col2Integer, 
col3Boolean, col4String))
+    val nameScope =
+      new NameScope(
+        Seq(col1Integer, col3Boolean, col2Integer, col2Integer, 
col3BooleanOther, col4String)
+      )
 
     assert(
-      nameScope.matchMultipartName(Seq("cOL1")) == NameTarget(
-        candidates = Seq(col1Integer.withName("cOL1")),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
+      nameScope.output == Seq(
+        col1Integer,
+        col3Boolean,
+        col2Integer,
+        col2Integer,
+        col3BooleanOther,
+        col4String
       )
     )
-    assert(
-      nameScope.matchMultipartName(Seq("CoL2")) == NameTarget(
-        candidates = Seq(col2Integer.withName("CoL2")),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+
+    checkOnePartNameLookup(
+      nameScope,
+      name = "cOL1",
+      candidates = Seq(col1Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col3")) == NameTarget(
-        candidates = Seq(col3Boolean.withName("col3")),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "CoL2",
+      candidates = Seq(col2Integer, col2Integer)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("COL4")) == NameTarget(
-        candidates = Seq(col4String.withName("COL4")),
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col3",
+      candidates = Seq(col3Boolean, col3BooleanOther)
     )
-    assert(
-      nameScope.matchMultipartName(Seq("col5")) == NameTarget(
-        candidates = Seq.empty,
-        allAttributes = Seq(col1Integer, col2Integer, col3Boolean, col4String)
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "COL4",
+      candidates = Seq(col4String)
     )
-  }
-
-  test("Duplicate attribute names from one plan") {
-    val nameScope = new NameScope
-
-    nameScope("table1") = Seq(col1Integer, col1Integer)
-    nameScope("table1") = Seq(col1IntegerOther)
-
-    assert(nameScope.getAllAttributes == Seq(col1Integer, col1Integer, 
col1IntegerOther))
-
-    nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-      candidates = Seq(col1Integer, col1IntegerOther)
-    )
-  }
-
-  test("Duplicate attribute names from several plans") {
-    val nameScope = new NameScope
-
-    nameScope("table1") = Seq(col1Integer, col1IntegerOther)
-    nameScope("table2") = Seq(col1Integer, col1IntegerOther)
-
-    assert(
-      nameScope.getAllAttributes == Seq(
-        col1Integer,
-        col1IntegerOther,
-        col1Integer,
-        col1IntegerOther
-      )
-    )
-
-    nameScope.matchMultipartName(Seq("col1")) == NameTarget(
-      candidates = Seq(
-        col1Integer,
-        col1IntegerOther,
-        col1Integer,
-        col1IntegerOther
-      )
+    checkOnePartNameLookup(
+      nameScope,
+      name = "col5",
+      candidates = Seq.empty
     )
   }
 
   test("Expand star") {
-    val nameScope = new NameScope
-
-    nameScope("table") =
+    var nameScope = new NameScope(

Review Comment:
   Do you really need the var?
   ```suggestion
       val nameScope = new NameScope(
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/UnionResolver.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.HashSet
+
+import org.apache.spark.sql.catalyst.analysis.{
+  withPosition,
+  AnsiTypeCoercion,
+  TypeCoercion,
+  TypeCoercionBase
+}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, 
ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Project, Union}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{DataType, MetadataBuilder}
+
+/**
+ * The [[UnionResolver]] performs [[Union]] operator resolution. This operator 
has 2+
+ * children. Resolution involves checking and normalizing child output 
attributes
+ * (data types and nullability).
+ */
+class UnionResolver(
+    resolver: Resolver,
+    expressionResolver: ExpressionResolver,
+    scopes: NameScopeStack)
+    extends TreeNodeResolver[Union, Union] {
+  private val expressionIdAssigner = expressionResolver.getExpressionIdAssigner
+  private val typeCoercion: TypeCoercionBase =
+    if (conf.ansiEnabled) {
+      AnsiTypeCoercion
+    } else {
+      TypeCoercion
+    }
+
+  /**
+   * Resolve the [[Union]] operator:
+   *  - Retrieve old output and child outputs if the operator is already 
resolved. This is relevant
+   *    for partially resolved subtrees from DataFrame programs.
+   *  - Resolve each child in the context of a) New [[NameScope]] b) New 
[[ExpressionIdAssigner]]
+   *    mapping. Collect child outputs to coerce them later.
+   *  - Perform projection-based expression ID deduplication if required. This 
is a hack to stay
+   *    compatible with fixed-point [[Analyzer]].
+   *  - Perform individual output deduplication to handle the distinict union 
case described in
+   *    [[performIndividualOutputExpressionIdDeduplication]] scaladoc.
+   *  - Validate that child outputs have same length or throw 
"NUM_COLUMNS_MISMATCH" otherwise.
+   *  - Compute widened data types for child output attributes using
+   *    [[typeCoercion.findWiderTypeForTwo]] or throw 
"INCOMPATIBLE_COLUMN_TYPE" if coercion fails.
+   *  - Add [[Project]] with [[Cast]] on children needing attribute data type 
widening.
+   *  - Assert that coerced outputs don't have conflicting expression IDs.
+   *  - Merge transformed outputs: For each column, merge child attributes' 
types using
+   *    [[StructType.unionLikeMerge]]. Mark column as nullable if any child 
attribute is.
+   *  - Store merged output in current [[NameScope]].
+   *  - Create a new mapping in [[ExpressionIdAssigner]] using the coerced and 
validated outputs.
+   *  - Return the resolved [[Union]] with new children.
+   */
+  override def resolve(unresolvedUnion: Union): Union = {
+    val (oldOutput, oldChildOutputs) = if (unresolvedUnion.resolved) {
+      (Some(unresolvedUnion.output), 
Some(unresolvedUnion.children.map(_.output)))
+    } else {
+      (None, None)
+    }
+
+    val (resolvedChildren, childOutputs) = 
unresolvedUnion.children.zipWithIndex.map {
+      case (unresolvedChild, childIndex) =>
+        scopes.withNewScope {
+          expressionIdAssigner.withNewMapping(isLeftmostChild = (childIndex == 
0)) {
+            val resolvedChild = resolver.resolve(unresolvedChild)
+            (resolvedChild, scopes.top.output)
+          }
+        }
+    }.unzip
+
+    val (projectBasedDeduplicatedChildren, 
projectBasedDeduplicatedChildOutputs) =
+      performProjectionBasedExpressionIdDeduplication(
+        resolvedChildren,
+        childOutputs,
+        oldChildOutputs
+      )
+    val (deduplicatedChildren, deduplicatedChildOutputs) =
+      performIndividualOutputExpressionIdDeduplication(
+        projectBasedDeduplicatedChildren,
+        projectBasedDeduplicatedChildOutputs
+      )
+
+    val (newChildren, newChildOutputs) = if 
(needToCoerceChildOutputs(deduplicatedChildOutputs)) {
+      coerceChildOutputs(
+        unresolvedUnion,
+        deduplicatedChildren,
+        deduplicatedChildOutputs,
+        validateAndDeduceTypes(unresolvedUnion, deduplicatedChildOutputs)
+      )
+    } else {
+      (deduplicatedChildren, deduplicatedChildOutputs)
+    }
+
+    
ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds(newChildOutputs)
+
+    withPosition(unresolvedUnion) {
+      scopes.overwriteTop(Union.mergeChildOutputs(newChildOutputs))
+    }
+
+    expressionIdAssigner.createMapping(scopes.top.output, oldOutput)
+
+    unresolvedUnion.copy(children = newChildren)
+  }
+
+  /**
+   * Fixed-point [[Analyzer]] uses [[DeduplicateRelations]] rule to handle 
duplicate expression IDs
+   * in multi-child operator outputs. For [[Union]]s it uses a 
"projection-based deduplication",
+   * i.e. places another [[Project]] operator with new [[Alias]]es on the 
right child if duplicate
+   * expression IDs detected. New [[Alias]] "covers" the original attribute 
with new expression ID.
+   * This is done for all child operators except [[LeafNode]]s.
+   *
+   * We don't need this operation in single-pass [[Resolver]], since we have
+   * [[ExpressionIdAssigner]] for expression ID deduplication, but perform it 
nevertheless to stay
+   * compatible with fixed-point [[Analyzer]]. Since new outputs are already 
deduplicated by
+   * [[ExpressionIdAssigner]], we check the _old_ outputs for duplicates and 
place a [[Project]]
+   * only if old outputs are available (i.e. we are dealing with a resolved 
subtree from
+   * DataFrame program).
+   */
+  private def performProjectionBasedExpressionIdDeduplication(
+      children: Seq[LogicalPlan],
+      childOutputs: Seq[Seq[Attribute]],
+      oldChildOutputs: Option[Seq[Seq[Attribute]]]
+  ): (Seq[LogicalPlan], Seq[Seq[Attribute]]) = {
+    oldChildOutputs match {
+      case Some(oldChildOutputs) =>
+        val oldExpressionIds = new HashSet[ExprId]
+
+        children
+          .zip(childOutputs)
+          .zip(oldChildOutputs)
+          .map {
+            case ((child: LeafNode, output), _) =>
+              (child, output)
+
+            case ((child, output), oldOutput) =>
+              val oldOutputExpressionIds = new HashSet[ExprId]
+
+              val hasConflicting = oldOutput.exists { oldAttribute =>
+                oldOutputExpressionIds.add(oldAttribute.exprId)
+                oldExpressionIds.contains(oldAttribute.exprId)
+              }
+
+              if (hasConflicting) {
+                val newExpressions = output.map { attribute =>
+                  Alias(attribute, attribute.name)()
+                }
+                (
+                  Project(projectList = newExpressions, child = child),
+                  newExpressions.map(_.toAttribute)
+                )
+              } else {
+                oldExpressionIds.addAll(oldOutputExpressionIds)
+
+                (child, output)
+              }
+          }
+          .unzip
+      case _ =>
+        (children, childOutputs)
+    }
+  }
+
+  /**
+   * Deduplicate expression IDs at the scope of each individual child output. 
This is necessary to
+   * handle the following case:
+   *
+   * {{{
+   * -- The correct answer is (1, 1), (1, 2). Without deduplication it would 
be (1, 1), because
+   * -- aggregation would be done only based on the first column.
+   * SELECT
+   *   a, a
+   * FROM
+   *   VALUES (1, 1), (1, 2) AS t1 (a, b)
+   * UNION
+   * SELECT
+   *  a, b
+   * FROM
+   *   VALUES (1, 1), (1, 2) AS t2 (a, b)
+   * }}}
+   *
+   * Putting [[Alias]] introduces a new expression ID for the attribute 
duplicates in the output. We
+   * also add `__is_duplicate` metadata so that 
[[AttributeSeq.getCandidatesForResolution]] doesn't
+   * produce conficting candidates when resolving names in the upper 
[[Project]] - this is

Review Comment:
   ```suggestion
      * produce conflicting candidates when resolving names in the upper 
[[Project]] - this is
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ViewResolverSuite.scala:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.analysis.resolver
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.resolver.{MetadataResolver, 
Resolver}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{
+  LocalRelation,
+  LogicalPlan,
+  OneRowRelation,
+  Project,
+  SubqueryAlias,
+  View
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class ViewResolverSuite extends QueryTest with SharedSparkSession {
+  private val catalogName =
+    "spark_catalog"
+  private val col1Integer =
+    AttributeReference(name = "col1", dataType = IntegerType, nullable = 
false)()
+  private val col2String =
+    AttributeReference(name = "col2", dataType = StringType, nullable = 
false)()
+
+  test("Temporary view") {
+    withView("temporary_view") {
+      spark.sql("CREATE TEMPORARY VIEW temporary_view AS SELECT col1, col2 
FROM VALUES (1, 'a');")
+
+      checkViewResolution(
+        "SELECT * FROM temporary_view",
+        expectedChild = Project(
+          projectList = Seq(
+            Alias(Cast(col1Integer, 
IntegerType).withTimeZone(conf.sessionLocalTimeZone), "col1")(),
+            Alias(Cast(col2String, 
StringType).withTimeZone(conf.sessionLocalTimeZone), "col2")()
+          ),
+          child = Project(
+            projectList = Seq(col1Integer, col2String),
+            child = LocalRelation(
+              output = Seq(col1Integer, col2String),
+              data = Seq(
+                InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst(_)))
+              )
+            )
+          )
+        )
+      )
+    }
+  }
+
+  test("Persistent view") {
+    withView("persistent_view") {
+      spark.sql("CREATE VIEW persistent_view AS SELECT col1, col2 FROM VALUES 
(1, 'a');")
+
+      checkViewResolution(
+        "SELECT * FROM persistent_view",
+        expectedChild = Project(
+          projectList = Seq(
+            Alias(Cast(col1Integer, 
IntegerType).withTimeZone(conf.sessionLocalTimeZone), "col1")(),
+            Alias(Cast(col2String, 
StringType).withTimeZone(conf.sessionLocalTimeZone), "col2")()
+          ),
+          child = Project(
+            projectList = Seq(col1Integer, col2String),
+            child = LocalRelation(
+              output = Seq(col1Integer, col2String),
+              data = Seq(
+                InternalRow.fromSeq(Seq(1, 
"a").map(CatalystTypeConverters.convertToCatalyst(_)))
+              )
+            )
+          )
+        )
+      )
+    }
+  }
+
+  test("Nested views resolution failed") {
+    withTable("table1") {
+      spark.sql("CREATE TABLE table1 (col1 INT, col2 STRING);")
+      withView("view1") {
+        spark.sql("CREATE VIEW view1 AS SELECT col1, col2 FROM table1;")
+        withView("view2") {
+          spark.sql("CREATE VIEW view2 AS SELECT col2, col1 FROM view1;")
+          withView("view3") {
+            spark.sql("CREATE VIEW view3 AS SELECT col1, col2 FROM view2;")
+
+            spark.sql("DROP TABLE table1;")
+
+            checkErrorTableNotFound(
+              exception = intercept[AnalysisException] {
+                checkViewResolution("SELECT * FROM view3")
+              },
+              tableName = "`table1`",
+              queryContext = ExpectedContext(
+                fragment = "view3",
+                start = 14,
+                stop = 18
+              )
+            )
+          }
+        }
+      }
+    }
+  }
+
+  test("Max nested view depth exceeded") {
+    try {
+      spark.sql("CREATE VIEW v0 AS SELECT * FROM VALUES (1);")
+      for (i <- 0 until conf.maxNestedViewDepth) {
+        spark.sql(s"CREATE VIEW v${i + 1} AS SELECT * FROM v${i};")

Review Comment:
   ```suggestion
           spark.sql(s"CREATE VIEW v${i + 1} AS SELECT * FROM v$i;")
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/UnionResolver.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.HashSet
+
+import org.apache.spark.sql.catalyst.analysis.{
+  withPosition,
+  AnsiTypeCoercion,
+  TypeCoercion,
+  TypeCoercionBase
+}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, 
ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Project, Union}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{DataType, MetadataBuilder}
+
+/**
+ * The [[UnionResolver]] performs [[Union]] operator resolution. This operator 
has 2+
+ * children. Resolution involves checking and normalizing child output 
attributes
+ * (data types and nullability).
+ */
+class UnionResolver(
+    resolver: Resolver,
+    expressionResolver: ExpressionResolver,
+    scopes: NameScopeStack)
+    extends TreeNodeResolver[Union, Union] {
+  private val expressionIdAssigner = expressionResolver.getExpressionIdAssigner
+  private val typeCoercion: TypeCoercionBase =
+    if (conf.ansiEnabled) {
+      AnsiTypeCoercion
+    } else {
+      TypeCoercion
+    }
+
+  /**
+   * Resolve the [[Union]] operator:
+   *  - Retrieve old output and child outputs if the operator is already 
resolved. This is relevant
+   *    for partially resolved subtrees from DataFrame programs.
+   *  - Resolve each child in the context of a) New [[NameScope]] b) New 
[[ExpressionIdAssigner]]
+   *    mapping. Collect child outputs to coerce them later.
+   *  - Perform projection-based expression ID deduplication if required. This 
is a hack to stay
+   *    compatible with fixed-point [[Analyzer]].
+   *  - Perform individual output deduplication to handle the distinict union 
case described in

Review Comment:
   ```suggestion
      *  - Perform individual output deduplication to handle the distinct union 
case described in
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/TimeAddResolver.scala:
##########
@@ -31,17 +31,17 @@ import 
org.apache.spark.sql.catalyst.expressions.{Expression, TimeAdd}
 class TimeAddResolver(
     expressionResolver: ExpressionResolver,
     timezoneAwareExpressionResolver: TimezoneAwareExpressionResolver)
-  extends TreeNodeResolver[TimeAdd, Expression]
-  with ResolvesExpressionChildren {
+    extends TreeNodeResolver[TimeAdd, Expression]
+    with ResolvesExpressionChildren {

Review Comment:
   Revert it back, see 
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#spacing-and-indentation



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/UnionResolver.scala:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis.resolver
+
+import java.util.HashSet
+
+import org.apache.spark.sql.catalyst.analysis.{
+  withPosition,
+  AnsiTypeCoercion,
+  TypeCoercion,
+  TypeCoercionBase
+}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, 
ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Project, Union}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{DataType, MetadataBuilder}
+
+/**
+ * The [[UnionResolver]] performs [[Union]] operator resolution. This operator 
has 2+
+ * children. Resolution involves checking and normalizing child output 
attributes
+ * (data types and nullability).
+ */
+class UnionResolver(
+    resolver: Resolver,
+    expressionResolver: ExpressionResolver,
+    scopes: NameScopeStack)
+    extends TreeNodeResolver[Union, Union] {
+  private val expressionIdAssigner = expressionResolver.getExpressionIdAssigner
+  private val typeCoercion: TypeCoercionBase =
+    if (conf.ansiEnabled) {
+      AnsiTypeCoercion
+    } else {
+      TypeCoercion
+    }
+
+  /**
+   * Resolve the [[Union]] operator:
+   *  - Retrieve old output and child outputs if the operator is already 
resolved. This is relevant
+   *    for partially resolved subtrees from DataFrame programs.
+   *  - Resolve each child in the context of a) New [[NameScope]] b) New 
[[ExpressionIdAssigner]]
+   *    mapping. Collect child outputs to coerce them later.
+   *  - Perform projection-based expression ID deduplication if required. This 
is a hack to stay
+   *    compatible with fixed-point [[Analyzer]].
+   *  - Perform individual output deduplication to handle the distinict union 
case described in
+   *    [[performIndividualOutputExpressionIdDeduplication]] scaladoc.
+   *  - Validate that child outputs have same length or throw 
"NUM_COLUMNS_MISMATCH" otherwise.
+   *  - Compute widened data types for child output attributes using
+   *    [[typeCoercion.findWiderTypeForTwo]] or throw 
"INCOMPATIBLE_COLUMN_TYPE" if coercion fails.
+   *  - Add [[Project]] with [[Cast]] on children needing attribute data type 
widening.
+   *  - Assert that coerced outputs don't have conflicting expression IDs.
+   *  - Merge transformed outputs: For each column, merge child attributes' 
types using
+   *    [[StructType.unionLikeMerge]]. Mark column as nullable if any child 
attribute is.
+   *  - Store merged output in current [[NameScope]].
+   *  - Create a new mapping in [[ExpressionIdAssigner]] using the coerced and 
validated outputs.
+   *  - Return the resolved [[Union]] with new children.
+   */
+  override def resolve(unresolvedUnion: Union): Union = {
+    val (oldOutput, oldChildOutputs) = if (unresolvedUnion.resolved) {
+      (Some(unresolvedUnion.output), 
Some(unresolvedUnion.children.map(_.output)))
+    } else {
+      (None, None)
+    }
+
+    val (resolvedChildren, childOutputs) = 
unresolvedUnion.children.zipWithIndex.map {
+      case (unresolvedChild, childIndex) =>
+        scopes.withNewScope {
+          expressionIdAssigner.withNewMapping(isLeftmostChild = (childIndex == 
0)) {
+            val resolvedChild = resolver.resolve(unresolvedChild)
+            (resolvedChild, scopes.top.output)
+          }
+        }
+    }.unzip
+
+    val (projectBasedDeduplicatedChildren, 
projectBasedDeduplicatedChildOutputs) =
+      performProjectionBasedExpressionIdDeduplication(
+        resolvedChildren,
+        childOutputs,
+        oldChildOutputs
+      )
+    val (deduplicatedChildren, deduplicatedChildOutputs) =
+      performIndividualOutputExpressionIdDeduplication(
+        projectBasedDeduplicatedChildren,
+        projectBasedDeduplicatedChildOutputs
+      )
+
+    val (newChildren, newChildOutputs) = if 
(needToCoerceChildOutputs(deduplicatedChildOutputs)) {
+      coerceChildOutputs(
+        unresolvedUnion,
+        deduplicatedChildren,
+        deduplicatedChildOutputs,
+        validateAndDeduceTypes(unresolvedUnion, deduplicatedChildOutputs)
+      )
+    } else {
+      (deduplicatedChildren, deduplicatedChildOutputs)
+    }
+
+    
ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds(newChildOutputs)
+
+    withPosition(unresolvedUnion) {
+      scopes.overwriteTop(Union.mergeChildOutputs(newChildOutputs))
+    }
+
+    expressionIdAssigner.createMapping(scopes.top.output, oldOutput)
+
+    unresolvedUnion.copy(children = newChildren)
+  }
+
+  /**
+   * Fixed-point [[Analyzer]] uses [[DeduplicateRelations]] rule to handle 
duplicate expression IDs
+   * in multi-child operator outputs. For [[Union]]s it uses a 
"projection-based deduplication",
+   * i.e. places another [[Project]] operator with new [[Alias]]es on the 
right child if duplicate
+   * expression IDs detected. New [[Alias]] "covers" the original attribute 
with new expression ID.
+   * This is done for all child operators except [[LeafNode]]s.
+   *
+   * We don't need this operation in single-pass [[Resolver]], since we have
+   * [[ExpressionIdAssigner]] for expression ID deduplication, but perform it 
nevertheless to stay
+   * compatible with fixed-point [[Analyzer]]. Since new outputs are already 
deduplicated by
+   * [[ExpressionIdAssigner]], we check the _old_ outputs for duplicates and 
place a [[Project]]
+   * only if old outputs are available (i.e. we are dealing with a resolved 
subtree from
+   * DataFrame program).
+   */
+  private def performProjectionBasedExpressionIdDeduplication(
+      children: Seq[LogicalPlan],
+      childOutputs: Seq[Seq[Attribute]],
+      oldChildOutputs: Option[Seq[Seq[Attribute]]]
+  ): (Seq[LogicalPlan], Seq[Seq[Attribute]]) = {
+    oldChildOutputs match {
+      case Some(oldChildOutputs) =>
+        val oldExpressionIds = new HashSet[ExprId]
+
+        children
+          .zip(childOutputs)
+          .zip(oldChildOutputs)
+          .map {
+            case ((child: LeafNode, output), _) =>
+              (child, output)
+
+            case ((child, output), oldOutput) =>
+              val oldOutputExpressionIds = new HashSet[ExprId]
+
+              val hasConflicting = oldOutput.exists { oldAttribute =>
+                oldOutputExpressionIds.add(oldAttribute.exprId)
+                oldExpressionIds.contains(oldAttribute.exprId)
+              }
+
+              if (hasConflicting) {
+                val newExpressions = output.map { attribute =>
+                  Alias(attribute, attribute.name)()
+                }
+                (
+                  Project(projectList = newExpressions, child = child),
+                  newExpressions.map(_.toAttribute)
+                )
+              } else {
+                oldExpressionIds.addAll(oldOutputExpressionIds)
+
+                (child, output)
+              }
+          }
+          .unzip
+      case _ =>
+        (children, childOutputs)
+    }
+  }
+
+  /**
+   * Deduplicate expression IDs at the scope of each individual child output. 
This is necessary to
+   * handle the following case:
+   *
+   * {{{
+   * -- The correct answer is (1, 1), (1, 2). Without deduplication it would 
be (1, 1), because
+   * -- aggregation would be done only based on the first column.
+   * SELECT
+   *   a, a
+   * FROM
+   *   VALUES (1, 1), (1, 2) AS t1 (a, b)
+   * UNION
+   * SELECT
+   *  a, b
+   * FROM
+   *   VALUES (1, 1), (1, 2) AS t2 (a, b)
+   * }}}
+   *
+   * Putting [[Alias]] introduces a new expression ID for the attribute 
duplicates in the output. We
+   * also add `__is_duplicate` metadata so that 
[[AttributeSeq.getCandidatesForResolution]] doesn't
+   * produce conficting candidates when resolving names in the upper 
[[Project]] - this is
+   * technically still the same attribute.
+   *
+   * Probably there's a better way to do that, but we want to stay compatible 
with the fixed-point
+   * [[Analyzer]].
+   *
+   * See SPARK-37865 for more details.
+   */
+  private def performIndividualOutputExpressionIdDeduplication(
+      children: Seq[LogicalPlan],
+      childOutputs: Seq[Seq[Attribute]]
+  ): (Seq[LogicalPlan], Seq[Seq[Attribute]]) = {
+    children
+      .zip(childOutputs)
+      .map {
+        case (child, childOutput) =>
+          var outputChanged = false
+
+          val expressionIds = new HashSet[ExprId]
+          val newOutput = childOutput.map { attribute =>
+            if (expressionIds.contains(attribute.exprId)) {
+              outputChanged = true
+
+              val newMetadata = new MetadataBuilder()
+                .withMetadata(attribute.metadata)
+                .putNull("__is_duplicate")
+                .build()
+              Alias(attribute, attribute.name)(explicitMetadata = 
Some(newMetadata))
+            } else {
+              expressionIds.add(attribute.exprId)
+
+              attribute
+            }
+          }
+
+          if (outputChanged) {
+            (Project(projectList = newOutput, child = child), 
newOutput.map(_.toAttribute))
+          } else {
+            (child, childOutput)
+          }
+      }
+      .unzip
+  }
+
+  /**
+   * Check if we need to coerce child output attributes to wider types. We 
need to do this if:
+   * - Output length differs between children. We will throw an appropriate 
error later during type
+   *   coercion with more diagnostics.
+   * - Output data types differ between children. We don't care about 
nullability for type coercion,
+   *   it will be correctly assigned later by [[Union.mergeChildOutputs]].
+   */
+  private def needToCoerceChildOutputs(childOutputs: Seq[Seq[Attribute]]): 
Boolean = {
+    val firstChildOutput = childOutputs.head
+    childOutputs.tail.exists { childOutput =>
+      childOutput.length != firstChildOutput.length ||
+      childOutput.zip(firstChildOutput).exists {
+        case (lhsAttribute, rhsAttribute) =>
+          !DataType.equalsStructurally(
+            lhsAttribute.dataType,
+            rhsAttribute.dataType,
+            ignoreNullability = true
+          )
+      }
+    }
+  }
+
+  /**
+   * Returns a sequence of data types representing the widened data types for 
each column:
+   *  - Validates that the number of columns in each child of the `Union` 
operator are equal.
+   *  - Validates that the data types of columns can be widened to a common 
type.
+   *  - Deduces the widened data types for each column.
+   */
+  private def validateAndDeduceTypes(
+      unresolvedUnion: Union,
+      childOutputs: Seq[Seq[Attribute]]): Seq[DataType] = {
+    val childDataTypes = childOutputs.map(attributes => attributes.map(attr => 
attr.dataType))
+
+    val expectedNumColumns = childDataTypes.head.length
+
+    childDataTypes.zipWithIndex.tail.foldLeft(childDataTypes.head) {
+      case (widenedTypes, (childColumnTypes, childIndex)) =>
+        if (childColumnTypes.length != expectedNumColumns) {
+          throwNumColumnsMismatch(
+            expectedNumColumns,
+            childColumnTypes,
+            childIndex,
+            unresolvedUnion
+          )
+        }
+
+        widenedTypes.zip(childColumnTypes).zipWithIndex.map {
+          case ((widenedColumnType, columnTypeForCurrentRow), columnIndex) =>
+            typeCoercion.findWiderTypeForTwo(widenedColumnType, 
columnTypeForCurrentRow).getOrElse {
+              throwIncompatibleColumnTypeError(
+                unresolvedUnion,
+                columnIndex,
+                childIndex + 1,
+                widenedColumnType,
+                columnTypeForCurrentRow
+              )
+            }
+        }
+    }
+  }
+
+  /**
+   * Coerce `childOutputs` to the previously calculated `widenedTypes`. If the 
data types for
+   * child output has changed, we have to add a [[Project]] operator with a 
[[Cast]] to the new
+   * type.
+   */
+  private def coerceChildOutputs(
+      unresolvedUnion: Union,

Review Comment:
   Is it used somewhere? If not, please, remove it.



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ExpressionIdAssignerSuite.scala:
##########
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.analysis.resolver
+
+import java.util.IdentityHashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.analysis.resolver.{ExpressionIdAssigner, 
Resolver}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class ExpressionIdAssignerSuite extends QueryTest with SharedSparkSession {
+  private val col1Integer = AttributeReference(name = "col1", dataType = 
IntegerType)()
+  private val col1IntegerAlias = Alias(col1Integer, "a")()
+  private val col2Integer = AttributeReference(name = "col2", dataType = 
IntegerType)()
+  private val col2IntegerAlias = Alias(col2Integer, "b")()
+  private val col3Integer = AttributeReference(name = "col3", dataType = 
IntegerType)()
+
+  private val CONSTRAINTS_VALIDATED = 
TreeNodeTag[Boolean]("constraints_validated")
+
+  test("Mapping is not created") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.mapExpression(col1Integer)
+    }
+
+    assigner.withNewMapping() {
+      assigner.withNewMapping() {
+        intercept[SparkException] {
+          assigner.mapExpression(col1Integer)
+        }
+      }
+    }
+  }
+
+  test("Mapping is created twice") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.createMapping()
+      assigner.createMapping()
+    }
+
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      assigner.withNewMapping() {
+        assigner.createMapping()
+
+        intercept[SparkException] {
+          assigner.createMapping()
+        }
+      }
+
+      intercept[SparkException] {
+        assigner.createMapping()
+      }
+    }
+  }
+
+  test("Create mapping with new output and old output with different length") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.createMapping(
+        newOutput = Seq(col1Integer.newInstance()),
+        oldOutput = Some(Seq(col1Integer, col2Integer))
+      )
+    }
+  }
+
+  test("Left branch: Single AttributeReference") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping()
+
+    val col1IntegerMapped = assigner.mapExpression(col1Integer)
+    assert(col1IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col1IntegerMapped.exprId != col1Integer.exprId)
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1IntegerMapped.exprId)
+
+    val col1IntegerMappedReferenced = assigner.mapExpression(col1IntegerMapped)
+    assert(col1IntegerMappedReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerMappedReferenced.exprId == col1IntegerMapped.exprId)
+  }
+
+  test("Right branch: Single AttributeReference") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val col1IntegerMapped = assigner.mapExpression(col1Integer)
+      assert(col1IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col1IntegerMapped.exprId != col1Integer.exprId)
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1IntegerMapped.exprId)
+
+      val col1IntegerMappedReferenced = 
assigner.mapExpression(col1IntegerMapped)
+      assert(col1IntegerMappedReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerMappedReferenced.exprId == col1IntegerMapped.exprId)
+    }
+  }
+
+  test("Left branch: Single Alias") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping()
+
+    val col1IntegerAliasMapped = assigner.mapExpression(col1IntegerAlias)
+    assert(col1IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col1IntegerAliasMapped.exprId == col1IntegerAlias.exprId)
+
+    val col1IntegerAliasReferenced = 
assigner.mapExpression(col1IntegerAlias.toAttribute)
+    assert(col1IntegerAliasReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerAliasReferenced.exprId == col1IntegerAliasMapped.exprId)
+
+    val col1IntegerAliasMappedReferenced =
+      assigner.mapExpression(col1IntegerAliasMapped.toAttribute)
+    assert(col1IntegerAliasMappedReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerAliasMappedReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+    val col1IntegerAliasMappedAgain = assigner.mapExpression(col1IntegerAlias)
+    assert(col1IntegerAliasMappedAgain.isInstanceOf[Alias])
+    assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAlias.exprId)
+    assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAliasMapped.exprId)
+  }
+
+  test("Right branch: Single Alias") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val col1IntegerAliasMapped = assigner.mapExpression(col1IntegerAlias)
+      assert(col1IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col1IntegerAliasMapped.exprId != col1IntegerAlias.exprId)
+
+      val col1IntegerAliasReferenced = 
assigner.mapExpression(col1IntegerAlias.toAttribute)
+      assert(col1IntegerAliasReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerAliasReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+      val col1IntegerAliasMappedReferenced =
+        assigner.mapExpression(col1IntegerAliasMapped.toAttribute)
+      assert(col1IntegerAliasMappedReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerAliasMappedReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+      val col1IntegerAliasMappedAgain = 
assigner.mapExpression(col1IntegerAlias)
+      assert(col1IntegerAliasMappedAgain.isInstanceOf[Alias])
+      assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAlias.exprId)
+      assert(col1IntegerAliasMappedAgain.exprId != 
col1IntegerAliasMapped.exprId)
+    }
+  }
+
+  test("Left branch: Create mapping with new output") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping(newOutput = Seq(col1Integer, col2Integer))
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1Integer.exprId)
+
+    val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+    assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerReferenced.exprId == col2Integer.exprId)
+
+    val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+    assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col2IntegerAliasMapped.exprId == col2IntegerAlias.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+
+    val col3IntegerMapped = assigner.mapExpression(col3Integer)
+    assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col3IntegerMapped.exprId != col3Integer.exprId)
+  }
+
+  test("Right branch: Create mapping with new output") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping(newOutput = Seq(col1Integer, col2Integer))
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1Integer.exprId)
+
+      val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+      assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerReferenced.exprId == col2Integer.exprId)
+
+      val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+      assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col2IntegerAliasMapped.exprId != col2IntegerAlias.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+
+      val col3IntegerMapped = assigner.mapExpression(col3Integer)
+      assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col3IntegerMapped.exprId != col3Integer.exprId)
+    }
+  }
+
+  test("Left branch: Create mapping with new output and old output") {
+    val assigner = new ExpressionIdAssigner
+
+    val col1IntegerNew = col1Integer.newInstance()
+    assert(col1IntegerNew.exprId != col1Integer.exprId)
+
+    val col2IntegerNew = col2Integer.newInstance()
+    assert(col2IntegerNew.exprId != col2Integer.exprId)
+
+    assigner.createMapping(
+      newOutput = Seq(col1IntegerNew, col2IntegerNew),
+      oldOutput = Some(Seq(col1Integer, col2Integer))
+    )
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1IntegerNew.exprId)
+
+    val col1IntegerNewReferenced = assigner.mapExpression(col1IntegerNew)
+    assert(col1IntegerNewReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerNewReferenced.exprId == col1IntegerNew.exprId)
+
+    val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+    assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerReferenced.exprId == col2IntegerNew.exprId)
+
+    val col2IntegerNewReferenced = assigner.mapExpression(col2IntegerNew)
+    assert(col2IntegerNewReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerNewReferenced.exprId == col2IntegerNew.exprId)
+
+    val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+    assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col2IntegerAliasMapped.exprId == col2IntegerAlias.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2IntegerNew.exprId)
+
+    val col3IntegerMapped = assigner.mapExpression(col3Integer)
+    assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col3IntegerMapped.exprId != col3Integer.exprId)
+  }
+
+  test("Right branch: Create mapping with new output and old output") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      val col1IntegerNew = col1Integer.newInstance()
+      assert(col1IntegerNew.exprId != col1Integer.exprId)
+
+      val col2IntegerNew = col2Integer.newInstance()
+      assert(col2IntegerNew.exprId != col2Integer.exprId)
+
+      assigner.createMapping(
+        newOutput = Seq(col1IntegerNew, col2IntegerNew),
+        oldOutput = Some(Seq(col1Integer, col2Integer))
+      )
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1IntegerNew.exprId)
+
+      val col1IntegerNewReferenced = assigner.mapExpression(col1IntegerNew)
+      assert(col1IntegerNewReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerNewReferenced.exprId == col1IntegerNew.exprId)
+
+      val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+      assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerReferenced.exprId == col2IntegerNew.exprId)
+
+      val col2IntegerNewReferenced = assigner.mapExpression(col2IntegerNew)
+      assert(col2IntegerNewReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerNewReferenced.exprId == col2IntegerNew.exprId)
+
+      val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+      assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col2IntegerAliasMapped.exprId != col2IntegerAlias.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2IntegerNew.exprId)
+
+      val col3IntegerMapped = assigner.mapExpression(col3Integer)
+      assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col3IntegerMapped.exprId != col3Integer.exprId)
+    }
+  }
+
+  test("Several layers") {
+    val assigner = new ExpressionIdAssigner
+    val literalAlias1 = Alias(Literal(1), "a")()
+    val literalAlias2 = Alias(Literal(2), "b")()
+
+    val output1 = assigner.withNewMapping() {
+      val output1 = assigner.withNewMapping() {
+        assigner.createMapping()
+
+        Seq(
+          assigner.mapExpression(col1Integer).toAttribute,
+          assigner.mapExpression(col2Integer).toAttribute
+        )
+      }
+
+      val output2 = assigner.withNewMapping() {
+        val col1IntegerNew = col1Integer.newInstance()
+        val col2IntegerNew = col2Integer.newInstance()
+
+        assigner.createMapping(newOutput = Seq(col1IntegerNew, col2IntegerNew))
+
+        Seq(
+          assigner.mapExpression(col1IntegerNew).toAttribute,
+          assigner.mapExpression(col2IntegerNew).toAttribute
+        )
+      }
+
+      val output3 = assigner.withNewMapping() {
+        val col1IntegerNew = col1Integer.newInstance()
+        val col2IntegerNew = col2Integer.newInstance()
+
+        assigner.createMapping(
+          newOutput = Seq(col1IntegerNew, col2IntegerNew),
+          oldOutput = Some(Seq(col1Integer, col2Integer))
+        )
+
+        Seq(
+          assigner.mapExpression(col1Integer).toAttribute,
+          assigner.mapExpression(col2Integer).toAttribute
+        )
+      }
+
+      output1.zip(output2).zip(output3).zip(Seq(col1Integer, 
col2Integer)).foreach {
+        case (((attribute1, attribute2), attribute3), originalAttribute) =>
+          assert(attribute1.exprId != originalAttribute.exprId)
+          assert(attribute2.exprId != originalAttribute.exprId)
+          assert(attribute3.exprId != originalAttribute.exprId)
+          assert(attribute1.exprId != attribute2.exprId)
+          assert(attribute1.exprId != attribute3.exprId)
+          assert(attribute2.exprId != attribute3.exprId)
+      }
+
+      assigner.createMapping(newOutput = output2)
+
+      val literalAlias1Remapped = assigner.mapExpression(literalAlias1)
+      assert(literalAlias1Remapped.isInstanceOf[Alias])
+      assert(literalAlias1Remapped.exprId != literalAlias1.exprId)
+
+      val literalAlias2Remapped = assigner.mapExpression(literalAlias2)
+      assert(literalAlias2Remapped.isInstanceOf[Alias])
+      assert(literalAlias2Remapped.exprId != literalAlias2.exprId)
+
+      Seq(literalAlias1Remapped.toAttribute, 
literalAlias2Remapped.toAttribute) ++ output2
+    }
+
+    val output2 = assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val literalAlias1Remapped = assigner.mapExpression(literalAlias1)
+      assert(literalAlias1Remapped.isInstanceOf[Alias])
+      assert(literalAlias1Remapped.exprId != literalAlias1.exprId)
+
+      val literalAlias2Remapped = assigner.mapExpression(literalAlias2)
+      assert(literalAlias2Remapped.isInstanceOf[Alias])
+      assert(literalAlias2Remapped.exprId != literalAlias2.exprId)
+
+      Seq(literalAlias1Remapped.toAttribute, literalAlias2Remapped.toAttribute)
+    }
+
+    output1.zip(output2).foreach {
+      case (aliasReference1, aliasReference2) =>
+        assert(aliasReference1.exprId != aliasReference2.exprId)
+    }
+
+    assigner.createMapping(newOutput = output1)
+
+    val aliasReferences = output1.map { aliasReference =>
+      assigner.mapExpression(aliasReference)
+    }
+
+    aliasReferences.zip(output1).zip(output2).foreach {
+      case ((aliasReference, aliasReference1), aliasReference2) =>
+        assert(aliasReference.exprId == aliasReference1.exprId)
+        assert(aliasReference.exprId != aliasReference2.exprId)
+    }
+
+    aliasReferences.map(_.toAttribute)
+  }
+
+  test("Simple select") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col1, 1 AS a, col1, 1 AS a, col2, 2 AS b, col3, 3 AS c
+        FROM
+          VALUES (1, 2, 3)
+        """)
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("Simple select, aliases referenced") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col3, c, col2, b, col1, a, col1, a
+        FROM (
+          SELECT
+            col1, 1 AS a, col1, col2, 2 AS b, col3, 3 AS c
+          FROM
+            VALUES (1, 2, 3)
+        )""")
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("Simple select, aliases referenced and rewritten") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col3, 3 AS c, col2, 2 AS b, col1, 1 AS a, col1, 1 AS a
+        FROM (
+          SELECT
+            col2, b, col1, a, col1, a, col3, c
+          FROM (
+            SELECT
+              col1, 1 AS a, col1, col2, 2 AS b, col3, 3 AS c
+            FROM
+              VALUES (1, 2, 3)
+          )
+        )""")
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("SQL Union, same table") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      checkExpressionIdAssignment(
+        spark
+          .sql("""
+          SELECT * FROM (
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+          )""")
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("SQL Union, different tables") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+      withTable("t2") {
+        spark.sql("CREATE TABLE t2 (col1 INT, col2 INT, col3 INT)")
+        withTable("t3") {
+          spark.sql("CREATE TABLE t3 (col1 INT, col2 INT, col3 INT)")
+
+          checkExpressionIdAssignment(
+            spark
+              .sql("""
+          SELECT * FROM (
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+          )""")
+              .queryExecution
+              .analyzed
+          )
+        }
+      }
+    }
+  }
+
+  test("SQL Union, same table, several layers") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      checkExpressionIdAssignment(
+        spark
+          .sql("""
+          SELECT * FROM (
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+            UNION ALL
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+          )
+          UNION ALL
+          SELECT * FROM (
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+            UNION ALL
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+          )""")
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("DataFrame Union, same table") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      val df = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM 
t1")
+      checkExpressionIdAssignment(df.union(df).queryExecution.analyzed)
+    }
+  }
+
+  test("DataFrame Union, different tables") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      withTable("t2") {
+        spark.sql("CREATE TABLE t2 (col1 INT, col2 INT, col3 INT)")
+
+        val df1 = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c 
FROM t1")
+        val df2 = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c 
FROM t2")
+        checkExpressionIdAssignment(df1.union(df2).queryExecution.analyzed)
+      }
+    }
+  }
+
+  test("DataFrame Union, same table, several layers") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      val df = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM 
t1")
+      checkExpressionIdAssignment(
+        df.union(df)
+          .select("*")
+          .union(df.union(df).select("*"))
+          .union(df.union(df).select("*"))
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("The case of output attribute names is preserved") {
+    val df = spark.sql("SELECT col1, COL1, cOl2, CoL2 FROM VALUES (1, 2)")
+
+    checkExpressionIdAssignment(df.queryExecution.analyzed)
+  }
+
+  test("The metadata of output attributes is preserved") {
+    val metadata1 = new MetadataBuilder().putString("m1", "1").putString("m2", 
"2").build()
+    val metadata2 = new MetadataBuilder().putString("m2", "3").putString("m3", 
"4").build()
+    val schema = new StructType().add("a", IntegerType, nullable = true, 
metadata = metadata2)
+    val df =
+      spark.sql("SELECT col1 FROM VALUES (1)").select(col("col1").as("a", 
metadata1)).to(schema)
+
+    checkExpressionIdAssignment(df.queryExecution.analyzed)
+  }
+
+  test("Alias with the same ID in multiple Projects") {
+    val t = LocalRelation.fromExternalRows(
+      Seq("a".attr.int, "b".attr.int),
+      0.until(10).map(_ => Row(1, 2))
+    )
+    val alias = ("a".attr + 1).as("a")
+    val plan = t.select(alias).select(alias).select(alias)
+
+    checkExpressionIdAssignment(plan)
+  }
+
+  test("Raw union, same table") {
+    val t = LocalRelation.fromExternalRows(
+      Seq("col1".attr.int, "col2".attr.int),
+      0.until(10).map(_ => Row(1, 2))
+    )
+    val query = t.select("col1".attr, Literal(1).as("a"), "col2".attr, 
Literal(2).as("b"))
+    val plan = query.union(query)
+
+    checkExpressionIdAssignment(plan)
+  }
+
+  test("DataFrame with binary arithmetic re-resolved") {
+    val result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+      val df = spark.sql("SELECT col1 + col2 AS a FROM VALUES (1, 2)")
+      df.union(df)
+    }
+    checkAnswer(result, Array(Row(3), Row(3)))
+  }
+
+  test("Leftmost branch attributes are not regenerated in DataFrame") {
+    withTable("tbl1") {
+      spark.sql("CREATE TABLE tbl1 (col1 INT, col2 INT)")
+      spark.sql("INSERT INTO tbl1 VALUES (0, 1), (2, 3)")
+
+      var result = 
withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> "true") {
+        val df1 = spark.table("tbl1")
+        df1.select(col("col1"), col("col2")).filter(df1("col1") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1)))
+
+      result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+        val df1 = spark.table("tbl1").select(col("col1").as("a"), 
col("col2").as("b"))
+        df1.select(col("a"), col("b")).filter(df1("a") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1)))
+
+      result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+        val df1 = spark.table("tbl1")
+        df1.union(df1).filter(df1("col1") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1), Row(0, 1)))
+    }
+  }
+
+  private def checkExpressionIdAssignment(originalPlan: LogicalPlan): Unit = {
+    val resolver = new Resolver(
+      catalogManager = spark.sessionState.catalogManager,
+      extensions = spark.sessionState.analyzer.singlePassResolverExtensions
+    )
+    val newPlan = resolver.resolve(originalPlan)
+
+    checkPlanConstraints(originalPlan, newPlan, leftmostBranch = true)
+    checkSubtreeConstraints(originalPlan, newPlan, leftmostBranch = true)
+  }
+
+  private def checkPlanConstraints(
+      originalPlan: LogicalPlan,
+      newPlan: LogicalPlan,
+      leftmostBranch: Boolean): Unit = {
+    originalPlan.children.zip(newPlan.children).zipWithIndex.foreach {
+      case ((originalChild, newChild), index) =>
+        checkPlanConstraints(originalChild, newChild, leftmostBranch && index 
== 0)
+    }
+
+    if (originalPlan.children.length > 1) {
+      ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds(
+        newPlan.children.map(_.output)
+      )
+      originalPlan.children.zip(newPlan.children).zipWithIndex.foreach {
+        case ((oldChild, newChild), index) =>
+          checkSubtreeConstraints(oldChild, newChild, leftmostBranch && index 
== 0)
+      }
+    }
+  }
+
+  private def checkSubtreeConstraints(
+      originalPlan: LogicalPlan,
+      newPlan: LogicalPlan,
+      leftmostBranch: Boolean): Unit = {
+    val originalOperators = new ArrayBuffer[LogicalPlan]
+    originalPlan.foreach {
+      case operator if 
!operator.getTagValue(CONSTRAINTS_VALIDATED).getOrElse(false) =>
+        originalOperators.append(operator)
+      case _ =>
+    }
+
+    val newOperators = new ArrayBuffer[LogicalPlan]
+
+    val leftmostOperators = new IdentityHashMap[LogicalPlan, Boolean]
+    if (leftmostBranch) {
+      leftmostOperators.put(newPlan, true)
+    }
+
+    newPlan.foreach {
+      case operator if 
!operator.getTagValue(CONSTRAINTS_VALIDATED).getOrElse(false) =>
+        newOperators.append(operator)
+
+        if (operator.children.nonEmpty && 
leftmostOperators.containsKey(operator)) {
+          leftmostOperators.put(operator.children.head, true)
+        }
+      case _ =>
+    }
+
+    val attributesByName = new HashMap[String, ArrayBuffer[AttributeReference]]
+    val aliasesByName = new HashMap[String, ArrayBuffer[Alias]]
+    originalOperators
+      .zip(newOperators)
+      .collect {
+        case (originalProject: Project, newProject: Project) =>
+          if (originalProject.resolved) {
+            (originalProject.projectList, newProject.projectList, newProject)
+          } else {
+            (newProject.projectList, newProject)
+          }
+        case (originalOperator: LogicalPlan, newOperator: LogicalPlan) =>
+          if (originalOperator.resolved) {
+            (originalOperator.output, newOperator.output, newOperator)
+          } else {
+            (newOperator.output, newOperator)
+          }
+      }
+      .foreach {
+        case (
+            originalExpressions: Seq[_],
+            newExpressions: Seq[_],
+            newOperator: LogicalPlan
+            ) =>
+          originalExpressions.zip(newExpressions).zipWithIndex.foreach {
+            case (
+                (originalAttribute: AttributeReference, newAttribute: 
AttributeReference),
+                index
+                ) =>
+              if (leftmostOperators.containsKey(newOperator)) {
+                assert(
+                  originalAttribute.exprId == newAttribute.exprId,
+                  s"Attribute at $index was regenerated: $originalAttribute, 
$newAttribute"
+                )
+              } else {
+                assert(
+                  originalAttribute.exprId != newAttribute.exprId,
+                  s"Attribute at $index was not regenerated: 
$originalAttribute, $newAttribute"
+                )
+              }
+
+              attributesByName
+                .getOrElseUpdate(newAttribute.name, new 
ArrayBuffer[AttributeReference])
+                .append(newAttribute)
+            case ((originalAlias: Alias, newAlias: Alias), index) =>
+              if (leftmostOperators.containsKey(newOperator)) {
+                assert(
+                  originalAlias.exprId == newAlias.exprId,
+                  s"Alias at $index was regenerated: $originalAlias, $newAlias"
+                )
+              } else {
+                assert(
+                  originalAlias.exprId != newAlias.exprId,
+                  s"Alias at $index was not regenerated: $originalAlias, 
$newAlias"
+                )
+              }
+
+              aliasesByName.getOrElseUpdate(newAlias.name, new 
ArrayBuffer[Alias]).append(newAlias)
+          }
+        case (newExpressions: Seq[_], newOperator: LogicalPlan) =>
+          newExpressions.foreach {
+            case newAttribute: AttributeReference =>
+              attributesByName
+                .getOrElseUpdate(newAttribute.name, new 
ArrayBuffer[AttributeReference])
+                .append(newAttribute)
+            case newAlias: Alias =>
+              aliasesByName.getOrElseUpdate(newAlias.name, new 
ArrayBuffer[Alias]).append(newAlias)
+          }
+      }
+
+    attributesByName.values.foreach {
+      case attributes =>
+        val ids = attributes.map(attribute => attribute.exprId).distinct
+        assert(
+          ids.length == 1,
+          s"Different IDs for the same attribute in the plan: $attributes, 
$newPlan"
+        )
+    }
+    aliasesByName.values.foreach {
+      case aliases =>

Review Comment:
   the same as above



##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/ExpressionIdAssignerSuite.scala:
##########
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.analysis.resolver
+
+import java.util.IdentityHashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.analysis.resolver.{ExpressionIdAssigner, 
Resolver}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class ExpressionIdAssignerSuite extends QueryTest with SharedSparkSession {
+  private val col1Integer = AttributeReference(name = "col1", dataType = 
IntegerType)()
+  private val col1IntegerAlias = Alias(col1Integer, "a")()
+  private val col2Integer = AttributeReference(name = "col2", dataType = 
IntegerType)()
+  private val col2IntegerAlias = Alias(col2Integer, "b")()
+  private val col3Integer = AttributeReference(name = "col3", dataType = 
IntegerType)()
+
+  private val CONSTRAINTS_VALIDATED = 
TreeNodeTag[Boolean]("constraints_validated")
+
+  test("Mapping is not created") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.mapExpression(col1Integer)
+    }
+
+    assigner.withNewMapping() {
+      assigner.withNewMapping() {
+        intercept[SparkException] {
+          assigner.mapExpression(col1Integer)
+        }
+      }
+    }
+  }
+
+  test("Mapping is created twice") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.createMapping()
+      assigner.createMapping()
+    }
+
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      assigner.withNewMapping() {
+        assigner.createMapping()
+
+        intercept[SparkException] {
+          assigner.createMapping()
+        }
+      }
+
+      intercept[SparkException] {
+        assigner.createMapping()
+      }
+    }
+  }
+
+  test("Create mapping with new output and old output with different length") {
+    val assigner = new ExpressionIdAssigner
+
+    intercept[SparkException] {
+      assigner.createMapping(
+        newOutput = Seq(col1Integer.newInstance()),
+        oldOutput = Some(Seq(col1Integer, col2Integer))
+      )
+    }
+  }
+
+  test("Left branch: Single AttributeReference") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping()
+
+    val col1IntegerMapped = assigner.mapExpression(col1Integer)
+    assert(col1IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col1IntegerMapped.exprId != col1Integer.exprId)
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1IntegerMapped.exprId)
+
+    val col1IntegerMappedReferenced = assigner.mapExpression(col1IntegerMapped)
+    assert(col1IntegerMappedReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerMappedReferenced.exprId == col1IntegerMapped.exprId)
+  }
+
+  test("Right branch: Single AttributeReference") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val col1IntegerMapped = assigner.mapExpression(col1Integer)
+      assert(col1IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col1IntegerMapped.exprId != col1Integer.exprId)
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1IntegerMapped.exprId)
+
+      val col1IntegerMappedReferenced = 
assigner.mapExpression(col1IntegerMapped)
+      assert(col1IntegerMappedReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerMappedReferenced.exprId == col1IntegerMapped.exprId)
+    }
+  }
+
+  test("Left branch: Single Alias") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping()
+
+    val col1IntegerAliasMapped = assigner.mapExpression(col1IntegerAlias)
+    assert(col1IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col1IntegerAliasMapped.exprId == col1IntegerAlias.exprId)
+
+    val col1IntegerAliasReferenced = 
assigner.mapExpression(col1IntegerAlias.toAttribute)
+    assert(col1IntegerAliasReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerAliasReferenced.exprId == col1IntegerAliasMapped.exprId)
+
+    val col1IntegerAliasMappedReferenced =
+      assigner.mapExpression(col1IntegerAliasMapped.toAttribute)
+    assert(col1IntegerAliasMappedReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerAliasMappedReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+    val col1IntegerAliasMappedAgain = assigner.mapExpression(col1IntegerAlias)
+    assert(col1IntegerAliasMappedAgain.isInstanceOf[Alias])
+    assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAlias.exprId)
+    assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAliasMapped.exprId)
+  }
+
+  test("Right branch: Single Alias") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val col1IntegerAliasMapped = assigner.mapExpression(col1IntegerAlias)
+      assert(col1IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col1IntegerAliasMapped.exprId != col1IntegerAlias.exprId)
+
+      val col1IntegerAliasReferenced = 
assigner.mapExpression(col1IntegerAlias.toAttribute)
+      assert(col1IntegerAliasReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerAliasReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+      val col1IntegerAliasMappedReferenced =
+        assigner.mapExpression(col1IntegerAliasMapped.toAttribute)
+      assert(col1IntegerAliasMappedReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerAliasMappedReferenced.exprId == 
col1IntegerAliasMapped.exprId)
+
+      val col1IntegerAliasMappedAgain = 
assigner.mapExpression(col1IntegerAlias)
+      assert(col1IntegerAliasMappedAgain.isInstanceOf[Alias])
+      assert(col1IntegerAliasMappedAgain.exprId != col1IntegerAlias.exprId)
+      assert(col1IntegerAliasMappedAgain.exprId != 
col1IntegerAliasMapped.exprId)
+    }
+  }
+
+  test("Left branch: Create mapping with new output") {
+    val assigner = new ExpressionIdAssigner
+
+    assigner.createMapping(newOutput = Seq(col1Integer, col2Integer))
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1Integer.exprId)
+
+    val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+    assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerReferenced.exprId == col2Integer.exprId)
+
+    val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+    assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col2IntegerAliasMapped.exprId == col2IntegerAlias.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+
+    val col3IntegerMapped = assigner.mapExpression(col3Integer)
+    assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col3IntegerMapped.exprId != col3Integer.exprId)
+  }
+
+  test("Right branch: Create mapping with new output") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      assigner.createMapping(newOutput = Seq(col1Integer, col2Integer))
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1Integer.exprId)
+
+      val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+      assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerReferenced.exprId == col2Integer.exprId)
+
+      val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+      assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col2IntegerAliasMapped.exprId != col2IntegerAlias.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+
+      val col3IntegerMapped = assigner.mapExpression(col3Integer)
+      assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col3IntegerMapped.exprId != col3Integer.exprId)
+    }
+  }
+
+  test("Left branch: Create mapping with new output and old output") {
+    val assigner = new ExpressionIdAssigner
+
+    val col1IntegerNew = col1Integer.newInstance()
+    assert(col1IntegerNew.exprId != col1Integer.exprId)
+
+    val col2IntegerNew = col2Integer.newInstance()
+    assert(col2IntegerNew.exprId != col2Integer.exprId)
+
+    assigner.createMapping(
+      newOutput = Seq(col1IntegerNew, col2IntegerNew),
+      oldOutput = Some(Seq(col1Integer, col2Integer))
+    )
+
+    val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+    assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerReferenced.exprId == col1IntegerNew.exprId)
+
+    val col1IntegerNewReferenced = assigner.mapExpression(col1IntegerNew)
+    assert(col1IntegerNewReferenced.isInstanceOf[AttributeReference])
+    assert(col1IntegerNewReferenced.exprId == col1IntegerNew.exprId)
+
+    val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+    assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerReferenced.exprId == col2IntegerNew.exprId)
+
+    val col2IntegerNewReferenced = assigner.mapExpression(col2IntegerNew)
+    assert(col2IntegerNewReferenced.isInstanceOf[AttributeReference])
+    assert(col2IntegerNewReferenced.exprId == col2IntegerNew.exprId)
+
+    val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+    assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+    assert(col2IntegerAliasMapped.exprId == col2IntegerAlias.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+    assert(col2IntegerAliasMapped.exprId != col2IntegerNew.exprId)
+
+    val col3IntegerMapped = assigner.mapExpression(col3Integer)
+    assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+    assert(col3IntegerMapped.exprId != col3Integer.exprId)
+  }
+
+  test("Right branch: Create mapping with new output and old output") {
+    val assigner = new ExpressionIdAssigner
+    assigner.withNewMapping() {
+      val col1IntegerNew = col1Integer.newInstance()
+      assert(col1IntegerNew.exprId != col1Integer.exprId)
+
+      val col2IntegerNew = col2Integer.newInstance()
+      assert(col2IntegerNew.exprId != col2Integer.exprId)
+
+      assigner.createMapping(
+        newOutput = Seq(col1IntegerNew, col2IntegerNew),
+        oldOutput = Some(Seq(col1Integer, col2Integer))
+      )
+
+      val col1IntegerReferenced = assigner.mapExpression(col1Integer)
+      assert(col1IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerReferenced.exprId == col1IntegerNew.exprId)
+
+      val col1IntegerNewReferenced = assigner.mapExpression(col1IntegerNew)
+      assert(col1IntegerNewReferenced.isInstanceOf[AttributeReference])
+      assert(col1IntegerNewReferenced.exprId == col1IntegerNew.exprId)
+
+      val col2IntegerReferenced = assigner.mapExpression(col2Integer)
+      assert(col2IntegerReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerReferenced.exprId == col2IntegerNew.exprId)
+
+      val col2IntegerNewReferenced = assigner.mapExpression(col2IntegerNew)
+      assert(col2IntegerNewReferenced.isInstanceOf[AttributeReference])
+      assert(col2IntegerNewReferenced.exprId == col2IntegerNew.exprId)
+
+      val col2IntegerAliasMapped = assigner.mapExpression(col2IntegerAlias)
+      assert(col2IntegerAliasMapped.isInstanceOf[Alias])
+      assert(col2IntegerAliasMapped.exprId != col2IntegerAlias.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2Integer.exprId)
+      assert(col2IntegerAliasMapped.exprId != col2IntegerNew.exprId)
+
+      val col3IntegerMapped = assigner.mapExpression(col3Integer)
+      assert(col3IntegerMapped.isInstanceOf[AttributeReference])
+      assert(col3IntegerMapped.exprId != col3Integer.exprId)
+    }
+  }
+
+  test("Several layers") {
+    val assigner = new ExpressionIdAssigner
+    val literalAlias1 = Alias(Literal(1), "a")()
+    val literalAlias2 = Alias(Literal(2), "b")()
+
+    val output1 = assigner.withNewMapping() {
+      val output1 = assigner.withNewMapping() {
+        assigner.createMapping()
+
+        Seq(
+          assigner.mapExpression(col1Integer).toAttribute,
+          assigner.mapExpression(col2Integer).toAttribute
+        )
+      }
+
+      val output2 = assigner.withNewMapping() {
+        val col1IntegerNew = col1Integer.newInstance()
+        val col2IntegerNew = col2Integer.newInstance()
+
+        assigner.createMapping(newOutput = Seq(col1IntegerNew, col2IntegerNew))
+
+        Seq(
+          assigner.mapExpression(col1IntegerNew).toAttribute,
+          assigner.mapExpression(col2IntegerNew).toAttribute
+        )
+      }
+
+      val output3 = assigner.withNewMapping() {
+        val col1IntegerNew = col1Integer.newInstance()
+        val col2IntegerNew = col2Integer.newInstance()
+
+        assigner.createMapping(
+          newOutput = Seq(col1IntegerNew, col2IntegerNew),
+          oldOutput = Some(Seq(col1Integer, col2Integer))
+        )
+
+        Seq(
+          assigner.mapExpression(col1Integer).toAttribute,
+          assigner.mapExpression(col2Integer).toAttribute
+        )
+      }
+
+      output1.zip(output2).zip(output3).zip(Seq(col1Integer, 
col2Integer)).foreach {
+        case (((attribute1, attribute2), attribute3), originalAttribute) =>
+          assert(attribute1.exprId != originalAttribute.exprId)
+          assert(attribute2.exprId != originalAttribute.exprId)
+          assert(attribute3.exprId != originalAttribute.exprId)
+          assert(attribute1.exprId != attribute2.exprId)
+          assert(attribute1.exprId != attribute3.exprId)
+          assert(attribute2.exprId != attribute3.exprId)
+      }
+
+      assigner.createMapping(newOutput = output2)
+
+      val literalAlias1Remapped = assigner.mapExpression(literalAlias1)
+      assert(literalAlias1Remapped.isInstanceOf[Alias])
+      assert(literalAlias1Remapped.exprId != literalAlias1.exprId)
+
+      val literalAlias2Remapped = assigner.mapExpression(literalAlias2)
+      assert(literalAlias2Remapped.isInstanceOf[Alias])
+      assert(literalAlias2Remapped.exprId != literalAlias2.exprId)
+
+      Seq(literalAlias1Remapped.toAttribute, 
literalAlias2Remapped.toAttribute) ++ output2
+    }
+
+    val output2 = assigner.withNewMapping() {
+      assigner.createMapping()
+
+      val literalAlias1Remapped = assigner.mapExpression(literalAlias1)
+      assert(literalAlias1Remapped.isInstanceOf[Alias])
+      assert(literalAlias1Remapped.exprId != literalAlias1.exprId)
+
+      val literalAlias2Remapped = assigner.mapExpression(literalAlias2)
+      assert(literalAlias2Remapped.isInstanceOf[Alias])
+      assert(literalAlias2Remapped.exprId != literalAlias2.exprId)
+
+      Seq(literalAlias1Remapped.toAttribute, literalAlias2Remapped.toAttribute)
+    }
+
+    output1.zip(output2).foreach {
+      case (aliasReference1, aliasReference2) =>
+        assert(aliasReference1.exprId != aliasReference2.exprId)
+    }
+
+    assigner.createMapping(newOutput = output1)
+
+    val aliasReferences = output1.map { aliasReference =>
+      assigner.mapExpression(aliasReference)
+    }
+
+    aliasReferences.zip(output1).zip(output2).foreach {
+      case ((aliasReference, aliasReference1), aliasReference2) =>
+        assert(aliasReference.exprId == aliasReference1.exprId)
+        assert(aliasReference.exprId != aliasReference2.exprId)
+    }
+
+    aliasReferences.map(_.toAttribute)
+  }
+
+  test("Simple select") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col1, 1 AS a, col1, 1 AS a, col2, 2 AS b, col3, 3 AS c
+        FROM
+          VALUES (1, 2, 3)
+        """)
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("Simple select, aliases referenced") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col3, c, col2, b, col1, a, col1, a
+        FROM (
+          SELECT
+            col1, 1 AS a, col1, col2, 2 AS b, col3, 3 AS c
+          FROM
+            VALUES (1, 2, 3)
+        )""")
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("Simple select, aliases referenced and rewritten") {
+    checkExpressionIdAssignment(
+      spark
+        .sql("""
+        SELECT
+          col3, 3 AS c, col2, 2 AS b, col1, 1 AS a, col1, 1 AS a
+        FROM (
+          SELECT
+            col2, b, col1, a, col1, a, col3, c
+          FROM (
+            SELECT
+              col1, 1 AS a, col1, col2, 2 AS b, col3, 3 AS c
+            FROM
+              VALUES (1, 2, 3)
+          )
+        )""")
+        .queryExecution
+        .analyzed
+    )
+  }
+
+  test("SQL Union, same table") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      checkExpressionIdAssignment(
+        spark
+          .sql("""
+          SELECT * FROM (
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+          )""")
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("SQL Union, different tables") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+      withTable("t2") {
+        spark.sql("CREATE TABLE t2 (col1 INT, col2 INT, col3 INT)")
+        withTable("t3") {
+          spark.sql("CREATE TABLE t3 (col1 INT, col2 INT, col3 INT)")
+
+          checkExpressionIdAssignment(
+            spark
+              .sql("""
+          SELECT * FROM (
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            UNION ALL
+            SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+          )""")
+              .queryExecution
+              .analyzed
+          )
+        }
+      }
+    }
+  }
+
+  test("SQL Union, same table, several layers") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      checkExpressionIdAssignment(
+        spark
+          .sql("""
+          SELECT * FROM (
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+            UNION ALL
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+          )
+          UNION ALL
+          SELECT * FROM (
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+            UNION ALL
+            SELECT * FROM (
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+              UNION ALL
+              SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM t1
+            )
+          )""")
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("DataFrame Union, same table") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      val df = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM 
t1")
+      checkExpressionIdAssignment(df.union(df).queryExecution.analyzed)
+    }
+  }
+
+  test("DataFrame Union, different tables") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      withTable("t2") {
+        spark.sql("CREATE TABLE t2 (col1 INT, col2 INT, col3 INT)")
+
+        val df1 = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c 
FROM t1")
+        val df2 = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c 
FROM t2")
+        checkExpressionIdAssignment(df1.union(df2).queryExecution.analyzed)
+      }
+    }
+  }
+
+  test("DataFrame Union, same table, several layers") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (col1 INT, col2 INT, col3 INT)")
+
+      val df = spark.sql("SELECT col1, 1 AS a, col2, 2 AS b, col3, 3 AS c FROM 
t1")
+      checkExpressionIdAssignment(
+        df.union(df)
+          .select("*")
+          .union(df.union(df).select("*"))
+          .union(df.union(df).select("*"))
+          .queryExecution
+          .analyzed
+      )
+    }
+  }
+
+  test("The case of output attribute names is preserved") {
+    val df = spark.sql("SELECT col1, COL1, cOl2, CoL2 FROM VALUES (1, 2)")
+
+    checkExpressionIdAssignment(df.queryExecution.analyzed)
+  }
+
+  test("The metadata of output attributes is preserved") {
+    val metadata1 = new MetadataBuilder().putString("m1", "1").putString("m2", 
"2").build()
+    val metadata2 = new MetadataBuilder().putString("m2", "3").putString("m3", 
"4").build()
+    val schema = new StructType().add("a", IntegerType, nullable = true, 
metadata = metadata2)
+    val df =
+      spark.sql("SELECT col1 FROM VALUES (1)").select(col("col1").as("a", 
metadata1)).to(schema)
+
+    checkExpressionIdAssignment(df.queryExecution.analyzed)
+  }
+
+  test("Alias with the same ID in multiple Projects") {
+    val t = LocalRelation.fromExternalRows(
+      Seq("a".attr.int, "b".attr.int),
+      0.until(10).map(_ => Row(1, 2))
+    )
+    val alias = ("a".attr + 1).as("a")
+    val plan = t.select(alias).select(alias).select(alias)
+
+    checkExpressionIdAssignment(plan)
+  }
+
+  test("Raw union, same table") {
+    val t = LocalRelation.fromExternalRows(
+      Seq("col1".attr.int, "col2".attr.int),
+      0.until(10).map(_ => Row(1, 2))
+    )
+    val query = t.select("col1".attr, Literal(1).as("a"), "col2".attr, 
Literal(2).as("b"))
+    val plan = query.union(query)
+
+    checkExpressionIdAssignment(plan)
+  }
+
+  test("DataFrame with binary arithmetic re-resolved") {
+    val result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+      val df = spark.sql("SELECT col1 + col2 AS a FROM VALUES (1, 2)")
+      df.union(df)
+    }
+    checkAnswer(result, Array(Row(3), Row(3)))
+  }
+
+  test("Leftmost branch attributes are not regenerated in DataFrame") {
+    withTable("tbl1") {
+      spark.sql("CREATE TABLE tbl1 (col1 INT, col2 INT)")
+      spark.sql("INSERT INTO tbl1 VALUES (0, 1), (2, 3)")
+
+      var result = 
withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> "true") {
+        val df1 = spark.table("tbl1")
+        df1.select(col("col1"), col("col2")).filter(df1("col1") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1)))
+
+      result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+        val df1 = spark.table("tbl1").select(col("col1").as("a"), 
col("col2").as("b"))
+        df1.select(col("a"), col("b")).filter(df1("a") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1)))
+
+      result = withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key 
-> "true") {
+        val df1 = spark.table("tbl1")
+        df1.union(df1).filter(df1("col1") === 0)
+      }
+      checkAnswer(result, Array(Row(0, 1), Row(0, 1)))
+    }
+  }
+
+  private def checkExpressionIdAssignment(originalPlan: LogicalPlan): Unit = {
+    val resolver = new Resolver(
+      catalogManager = spark.sessionState.catalogManager,
+      extensions = spark.sessionState.analyzer.singlePassResolverExtensions
+    )
+    val newPlan = resolver.resolve(originalPlan)
+
+    checkPlanConstraints(originalPlan, newPlan, leftmostBranch = true)
+    checkSubtreeConstraints(originalPlan, newPlan, leftmostBranch = true)
+  }
+
+  private def checkPlanConstraints(
+      originalPlan: LogicalPlan,
+      newPlan: LogicalPlan,
+      leftmostBranch: Boolean): Unit = {
+    originalPlan.children.zip(newPlan.children).zipWithIndex.foreach {
+      case ((originalChild, newChild), index) =>
+        checkPlanConstraints(originalChild, newChild, leftmostBranch && index 
== 0)
+    }
+
+    if (originalPlan.children.length > 1) {
+      ExpressionIdAssigner.assertOutputsHaveNoConflictingExpressionIds(
+        newPlan.children.map(_.output)
+      )
+      originalPlan.children.zip(newPlan.children).zipWithIndex.foreach {
+        case ((oldChild, newChild), index) =>
+          checkSubtreeConstraints(oldChild, newChild, leftmostBranch && index 
== 0)
+      }
+    }
+  }
+
+  private def checkSubtreeConstraints(
+      originalPlan: LogicalPlan,
+      newPlan: LogicalPlan,
+      leftmostBranch: Boolean): Unit = {
+    val originalOperators = new ArrayBuffer[LogicalPlan]
+    originalPlan.foreach {
+      case operator if 
!operator.getTagValue(CONSTRAINTS_VALIDATED).getOrElse(false) =>
+        originalOperators.append(operator)
+      case _ =>
+    }
+
+    val newOperators = new ArrayBuffer[LogicalPlan]
+
+    val leftmostOperators = new IdentityHashMap[LogicalPlan, Boolean]
+    if (leftmostBranch) {
+      leftmostOperators.put(newPlan, true)
+    }
+
+    newPlan.foreach {
+      case operator if 
!operator.getTagValue(CONSTRAINTS_VALIDATED).getOrElse(false) =>
+        newOperators.append(operator)
+
+        if (operator.children.nonEmpty && 
leftmostOperators.containsKey(operator)) {
+          leftmostOperators.put(operator.children.head, true)
+        }
+      case _ =>
+    }
+
+    val attributesByName = new HashMap[String, ArrayBuffer[AttributeReference]]
+    val aliasesByName = new HashMap[String, ArrayBuffer[Alias]]
+    originalOperators
+      .zip(newOperators)
+      .collect {
+        case (originalProject: Project, newProject: Project) =>
+          if (originalProject.resolved) {
+            (originalProject.projectList, newProject.projectList, newProject)
+          } else {
+            (newProject.projectList, newProject)
+          }
+        case (originalOperator: LogicalPlan, newOperator: LogicalPlan) =>
+          if (originalOperator.resolved) {
+            (originalOperator.output, newOperator.output, newOperator)
+          } else {
+            (newOperator.output, newOperator)
+          }
+      }
+      .foreach {
+        case (
+            originalExpressions: Seq[_],
+            newExpressions: Seq[_],
+            newOperator: LogicalPlan
+            ) =>
+          originalExpressions.zip(newExpressions).zipWithIndex.foreach {
+            case (
+                (originalAttribute: AttributeReference, newAttribute: 
AttributeReference),
+                index
+                ) =>
+              if (leftmostOperators.containsKey(newOperator)) {
+                assert(
+                  originalAttribute.exprId == newAttribute.exprId,
+                  s"Attribute at $index was regenerated: $originalAttribute, 
$newAttribute"
+                )
+              } else {
+                assert(
+                  originalAttribute.exprId != newAttribute.exprId,
+                  s"Attribute at $index was not regenerated: 
$originalAttribute, $newAttribute"
+                )
+              }
+
+              attributesByName
+                .getOrElseUpdate(newAttribute.name, new 
ArrayBuffer[AttributeReference])
+                .append(newAttribute)
+            case ((originalAlias: Alias, newAlias: Alias), index) =>
+              if (leftmostOperators.containsKey(newOperator)) {
+                assert(
+                  originalAlias.exprId == newAlias.exprId,
+                  s"Alias at $index was regenerated: $originalAlias, $newAlias"
+                )
+              } else {
+                assert(
+                  originalAlias.exprId != newAlias.exprId,
+                  s"Alias at $index was not regenerated: $originalAlias, 
$newAlias"
+                )
+              }
+
+              aliasesByName.getOrElseUpdate(newAlias.name, new 
ArrayBuffer[Alias]).append(newAlias)
+          }
+        case (newExpressions: Seq[_], newOperator: LogicalPlan) =>
+          newExpressions.foreach {
+            case newAttribute: AttributeReference =>
+              attributesByName
+                .getOrElseUpdate(newAttribute.name, new 
ArrayBuffer[AttributeReference])
+                .append(newAttribute)
+            case newAlias: Alias =>
+              aliasesByName.getOrElseUpdate(newAlias.name, new 
ArrayBuffer[Alias]).append(newAlias)
+          }
+      }
+
+    attributesByName.values.foreach {
+      case attributes =>
+        val ids = attributes.map(attribute => attribute.exprId).distinct
+        assert(
+          ids.length == 1,
+          s"Different IDs for the same attribute in the plan: $attributes, 
$newPlan"
+        )
+    }

Review Comment:
   You can convert to regular function call, I think:
   ```suggestion
       attributesByName.values.foreach { attributes =>
         val ids = attributes.map(attribute => attribute.exprId).distinct
         assert(
           ids.length == 1,
           s"Different IDs for the same attribute in the plan: $attributes, 
$newPlan"
         )
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to