coderfender commented on code in PR #4429:
URL: https://github.com/apache/datafusion-comet/pull/4429#discussion_r3367795619


##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
   }
 }
 
+case class CometBroadcastNestedLoopJoinExec(
+    override val nativeOp: Operator,
+    override val originalPlan: SparkPlan,
+    override val output: Seq[Attribute],
+    override val outputOrdering: Seq[SortOrder],
+    joinType: JoinType,
+    condition: Option[Expression],
+    buildSide: BuildSide,
+    override val left: SparkPlan,
+    override val right: SparkPlan,
+    override val serializedPlanOpt: SerializedPlan)
+    extends CometBinaryExec {
+
+  // Mirror Spark's BroadcastNestedLoopJoinExec: output partitioning derives 
from the streamed
+  // (non-broadcast) side. Reading from live left/right rather than 
originalPlan keeps this
+  // correct after an AQE child swap.
+  private def streamedPlan: SparkPlan = buildSide match {
+    case BuildLeft => right
+    case BuildRight => left
+  }
+
+  override def outputPartitioning: Partitioning = 
streamedPlan.outputPartitioning
+
+  override def withNewChildrenInternal(newLeft: SparkPlan, newRight: 
SparkPlan): SparkPlan =
+    this.copy(left = newLeft, right = newRight)
+
+  override def stringArgs: Iterator[Any] =
+    Iterator(joinType, buildSide, condition, left, right)
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: CometBroadcastNestedLoopJoinExec =>
+        this.output == other.output &&
+        this.joinType == other.joinType &&
+        this.condition == other.condition &&
+        this.buildSide == other.buildSide &&
+        this.left == other.left &&
+        this.right == other.right &&
+        this.serializedPlanOpt == other.serializedPlanOpt
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int =
+    Objects.hashCode(output, joinType, condition, buildSide, left, right)
+}
+
+object CometBroadcastNestedLoopJoinExec extends 
CometOperatorSerde[BroadcastNestedLoopJoinExec] {
+
+  /**
+   * Get the optional Comet configuration entry that is used to enable or 
disable native support
+   * for this operator.
+   */
+  override def enabledConfig: Option[ConfigEntry[Boolean]] = {
+    Some(CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED)
+  }
+
+  private val broadcastBuildReplicationReason =
+    "BNLJ combinations that emit per-build-row results need a cross-partition 
merge that" +
+      " DataFusion's NestedLoopJoinExec does not provide. Affects: 
LeftOuter+BuildLeft," +
+      " RightOuter+BuildRight, FullOuter, LeftSemi+BuildLeft, 
LeftAnti+BuildLeft."
+
+  override def getSupportLevel(op: BroadcastNestedLoopJoinExec): SupportLevel =

Review Comment:
   Agreed. I was initially gating the join behind the config in the test suite 
to figure out the support matrix through testing. I have since then removed the 
now redundant config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to