[ https://issues.apache.org/jira/browse/SPARK-53143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-53143. --------------------------------- Fix Version/s: 4.1.0 Resolution: Fixed Issue resolved by pull request 51873 [https://github.com/apache/spark/pull/51873] > Fix self join in DataFrame API - Join is not the only expected output from > analyzer > ----------------------------------------------------------------------------------- > > Key: SPARK-53143 > URL: https://issues.apache.org/jira/browse/SPARK-53143 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 4.0.0 > Reporter: David Milicevic > Assignee: David Milicevic > Priority: Major > Labels: pull-request-available > Fix For: 4.1.0 > > > There is an edge case with self join when multiple joins are present. > > Example: two joins, where the latter one is self join. > The first one is the "using" join - in this case, analyzer's > `ResolveNaturalAndUsingJoin` will add `Project` as the top node. > The second join is a self join, but with specified join condition (i.e. > `joinExprs`) - > if the join condition uses columns that are not part of the project list (of > the first > join), `AddMetadataColumns` rule will be hit to add metadata for those > columns. As a > consequence, `Project` will be added to the top of joined plan to return the > original/expected list of projected columns. > > Whereas similar (i.e. `Project` node on top) can happen in multiple other > cases, > from `Dataset` perspective the issue is specific to self joins only, since > `resolveSelfJoinCondition` assumed that the analyzed plan will be always of > `Join` type. > > Minimalist PySpark repro: > {code:java} > spark.sql("CREATE TABLE IF NOT EXISTS table_11 (id INT);") > spark.sql("CREATE TABLE IF NOT EXISTS table_12 (id INT, col_1 STRING);") > df = spark.table("table_12").where("col_1 = 'test'").select("id") > spark.table("table_11").alias("t") \ > .join(df.alias("df1"), on = ["id"]) \ > .join(df.alias("df2"), col("df1.id") == col("df2.id"), how = "left"){code} > Throws an exception: > {code:java} > java.lang.ClassCastException: class > org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to class > org.apache.spark.sql.catalyst.plans.logical.Join > (org.apache.spark.sql.catalyst.plans.logical.Project and > org.apache.spark.sql.catalyst.plans.logical.Join are in unnamed module of > loader 'app') > at > org.apache.spark.sql.classic.Dataset.resolveSelfJoinCondition(Dataset.scala:665) > at org.apache.spark.sql.classic.Dataset.join(Dataset.scala:690) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:569) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at > py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184) > at py4j.ClientServerConnection.run(ClientServerConnection.java:108) > at java.base/java.lang.Thread.run(Thread.java:840) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org