[ 
https://issues.apache.org/jira/browse/SPARK-53143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-53143:
-----------------------------------

    Assignee: David Milicevic

> Fix self join in DataFrame API - Join is not the only expected output from 
> analyzer
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-53143
>                 URL: https://issues.apache.org/jira/browse/SPARK-53143
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 4.0.0
>            Reporter: David Milicevic
>            Assignee: David Milicevic
>            Priority: Major
>              Labels: pull-request-available
>
> There is an edge case with self join when multiple joins are present.
>  
> Example: two joins, where the latter one is self join.
> The first one is the "using" join - in this case, analyzer's
> `ResolveNaturalAndUsingJoin` will add `Project` as the top node.
> The second join is a self join, but with specified join condition (i.e. 
> `joinExprs`) -
> if the join condition uses columns that are not part of the project list (of 
> the first
> join), `AddMetadataColumns` rule will be hit to add metadata for those 
> columns. As a
> consequence, `Project` will be added to the top of joined plan to return the
> original/expected list of projected columns.
>  
> Whereas similar (i.e. `Project` node on top) can happen in multiple other 
> cases,
> from `Dataset` perspective the issue is specific to self joins only, since
> `resolveSelfJoinCondition` assumed that the analyzed plan will be always of 
> `Join` type.
>  
> Minimalist PySpark repro:
> {code:java}
> spark.sql("CREATE TABLE IF NOT EXISTS table_11 (id INT);")
> spark.sql("CREATE TABLE IF NOT EXISTS table_12 (id INT, col_1 STRING);")
> df = spark.table("table_12").where("col_1 = 'test'").select("id")
> spark.table("table_11").alias("t") \
>   .join(df.alias("df1"), on = ["id"]) \
>   .join(df.alias("df2"), col("df1.id") == col("df2.id"), how = "left"){code}
> Throws an exception:
> {code:java}
> java.lang.ClassCastException: class 
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to class 
> org.apache.spark.sql.catalyst.plans.logical.Join 
> (org.apache.spark.sql.catalyst.plans.logical.Project and 
> org.apache.spark.sql.catalyst.plans.logical.Join are in unnamed module of 
> loader 'app')
>       at 
> org.apache.spark.sql.classic.Dataset.resolveSelfJoinCondition(Dataset.scala:665)
>       at org.apache.spark.sql.classic.Dataset.join(Dataset.scala:690)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:569)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>       at py4j.Gateway.invoke(Gateway.java:282)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
>       at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
>       at java.base/java.lang.Thread.run(Thread.java:840) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to