mbutrovich commented on code in PR #4429:
URL: https://github.com/apache/datafusion-comet/pull/4429#discussion_r3363491497
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
}
}
+case class CometBroadcastNestedLoopJoinExec(
Review Comment:
Should this define a `metrics` map like `CometMetricNode.hashJoinMetrics`
does for BHJ (and `sortMergeJoinMetrics` for SMJ)? As written it inherits
`baselineMetrics`, so the native NLJ's build/probe/join timing metrics won't
surface in the Spark UI.
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
}
}
+case class CometBroadcastNestedLoopJoinExec(
+ override val nativeOp: Operator,
+ override val originalPlan: SparkPlan,
+ override val output: Seq[Attribute],
+ override val outputOrdering: Seq[SortOrder],
+ joinType: JoinType,
+ condition: Option[Expression],
+ buildSide: BuildSide,
+ override val left: SparkPlan,
+ override val right: SparkPlan,
+ override val serializedPlanOpt: SerializedPlan)
+ extends CometBinaryExec {
+
+ // Mirror Spark's BroadcastNestedLoopJoinExec: output partitioning derives
from the streamed
+ // (non-broadcast) side. Reading from live left/right rather than
originalPlan keeps this
+ // correct after an AQE child swap.
+ private def streamedPlan: SparkPlan = buildSide match {
+ case BuildLeft => right
+ case BuildRight => left
+ }
+
+ override def outputPartitioning: Partitioning =
streamedPlan.outputPartitioning
+
+ override def withNewChildrenInternal(newLeft: SparkPlan, newRight:
SparkPlan): SparkPlan =
+ this.copy(left = newLeft, right = newRight)
+
+ override def stringArgs: Iterator[Any] =
+ Iterator(joinType, buildSide, condition, left, right)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: CometBroadcastNestedLoopJoinExec =>
+ this.output == other.output &&
+ this.joinType == other.joinType &&
+ this.condition == other.condition &&
+ this.buildSide == other.buildSide &&
+ this.left == other.left &&
+ this.right == other.right &&
+ this.serializedPlanOpt == other.serializedPlanOpt
+ case _ =>
+ false
+ }
+ }
+
+ override def hashCode(): Int =
+ Objects.hashCode(output, joinType, condition, buildSide, left, right)
+}
+
+object CometBroadcastNestedLoopJoinExec extends
CometOperatorSerde[BroadcastNestedLoopJoinExec] {
+
+ /**
+ * Get the optional Comet configuration entry that is used to enable or
disable native support
+ * for this operator.
+ */
+ override def enabledConfig: Option[ConfigEntry[Boolean]] = {
+ Some(CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED)
+ }
+
+ private val broadcastBuildReplicationReason =
+ "BNLJ combinations that emit per-build-row results need a cross-partition
merge that" +
+ " DataFusion's NestedLoopJoinExec does not provide. Affects:
LeftOuter+BuildLeft," +
+ " RightOuter+BuildRight, FullOuter, LeftSemi+BuildLeft,
LeftAnti+BuildLeft."
+
+ override def getSupportLevel(op: BroadcastNestedLoopJoinExec): SupportLevel =
Review Comment:
The support matrix here lines up exactly with Spark's
`BroadcastNestedLoopJoinExec.outputPartitioning` supported set, and the
reasoning in `broadcastBuildReplicationReason` is spot on. Every case returns
either `Compatible(None)` or `Unsupported(...)`, never `Incompatible`. Since
`isOperatorEnabled` only consults the allow-incompat config on the
`Incompatible` branch, the
`getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec")` line in the
`CometJoinSuite` override is dead. Is that intentional, or leftover?
##########
spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala:
##########
@@ -702,4 +705,102 @@ class CometJoinSuite extends CometTestBase {
}
}
}
+
+ test("BroadcastNestedLoopJoin with unequal filter") {
Review Comment:
The compatible matrix is well covered. Could we add a couple of result-only
`checkSparkAnswer` cases for the unsupported combinations (`FullOuter`,
`LeftOuter` with `BuildLeft`) to confirm they fall back to Spark and still
match? A broadcast-reuse case (same relation feeding two BNLJs under AQE) would
also be reassuring given reuse is handled generically rather than in this
operator. Minor: no coverage for a join condition referencing nullable or
null-producing columns, and no left-outer-without-condition case.
##########
spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala:
##########
@@ -38,7 +38,10 @@ class CometJoinSuite extends CometTestBase {
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
- withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+ withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec") ->
"true") {
Review Comment:
Similar to the other comment in operators.scala. This allow-incompat config
is never read for BNLJ. `getSupportLevel` only ever returns `Compatible(None)`
or `Unsupported(...)`, never `Incompatible`, and `isOperatorEnabled` only
consults allow-incompat on the `Incompatible` branch. Can this be dropped, or
was an `Incompatible` case intended?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]