godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r507511648



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -175,29 +186,32 @@ class DeadlockBreakupProcessor extends DAGProcessor {
               probeRel,
               distribution)
             e.setRequiredShuffleMode(ShuffleMode.BATCH)
-            // replace join node's input
-            join.replaceInputNode(probeSideIndex, e)
+            // replace node's input
+            
node.asInstanceOf[BatchExecNode[_]].replaceInputNode(probeSideIndex, e)
         }
       }
     }
 
     override def visit(node: ExecNode[_, _]): Unit = {
       super.visit(node)
-      node match {
-        case hashJoin: BatchExecHashJoin =>
-          val joinInfo = hashJoin.getJoinInfo
-          val columns = if (hashJoin.leftIsBuild) joinInfo.rightKeys else 
joinInfo.leftKeys
-          val distribution = FlinkRelDistribution.hash(columns)
-          rewriteJoin(hashJoin, hashJoin.leftIsBuild, distribution)
-        case nestedLoopJoin: BatchExecNestedLoopJoin =>
-          rewriteJoin(nestedLoopJoin, nestedLoopJoin.leftIsBuild, 
FlinkRelDistribution.ANY)
-        case _ => // do nothing
+      val inputEdges = node.getInputEdges
+      if (inputEdges.size() == 2) {
+        val leftPriority = inputEdges.get(0).getPriority
+        val rightPriority = inputEdges.get(1).getPriority
+        val requiredShuffle = if (leftPriority == 1) {

Review comment:
       not changed here

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -296,44 +311,48 @@ class DeadlockBreakupProcessor extends DAGProcessor {
     * Returns true if all input-paths have barrier node (e.g. agg, sort), 
otherwise false.
     */
   private def hasBarrierNodeInInputPaths(
-      inputPathsOfProbeSide: List[Array[ExecNode[_, _]]]): Boolean = {
-    require(inputPathsOfProbeSide.nonEmpty)
+      inputPathsOfLowerInput: List[Array[ExecNode[_, _]]]): Boolean = {
+    require(inputPathsOfLowerInput.nonEmpty)
 
-    /** Return true if the successor of join in the input-path is build node, 
otherwise false */
-    def checkJoinBuildSide(
-        buildNode: ExecNode[_, _],
-        idxOfJoin: Int,
+    /** Return true if the successor in the input-path is also in higher 
input, otherwise false */
+    def checkHigherInput(
+        higherNode: ExecNode[_, _],
+        idx: Int,
         inputPath: Array[ExecNode[_, _]]): Boolean = {
-      if (idxOfJoin < inputPath.length - 1) {
-        val nextNode = inputPath(idxOfJoin + 1)
-        // next node is build node of hash join
-        buildNode eq nextNode
+      if (idx < inputPath.length - 1) {
+        val nextNode = inputPath(idx + 1)
+        // next node is higher input
+        higherNode eq nextNode
       } else {
         false
       }
     }
 
-    inputPathsOfProbeSide.forall {
+    inputPathsOfLowerInput.forall {
       inputPath =>
         var idx = 0
         var hasFullDamNode = false
         // should exclude the reused node (at last position in path)
         while (!hasFullDamNode && idx < inputPath.length - 1) {
           val node = inputPath(idx)
-          val nodeDamBehavior = 
node.asInstanceOf[BatchExecNode[_]].getDamBehavior
-          hasFullDamNode = if (nodeDamBehavior == DamBehavior.FULL_DAM) {
+          val atLeastEndInput = node.getInputEdges.forall(
+            e => 
e.getDamBehavior.stricterOrEqual(ExecEdge.DamBehavior.END_INPUT))
+          hasFullDamNode = if (atLeastEndInput) {
             true
           } else {
-            node match {
-              case h: BatchExecHashJoin =>
-                val buildSideIndex = if (h.leftIsBuild) 0 else 1
-                val buildNode = h.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case n: BatchExecNestedLoopJoin =>
-                val buildSideIndex = if (n.leftIsBuild) 0 else 1
-                val buildNode = n.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case _ => false
+            val inputEdges = node.getInputEdges
+            if (inputEdges.size() == 2) {
+              val leftPriority = inputEdges.get(0).getPriority
+              val rightPriority = inputEdges.get(1).getPriority
+              if (leftPriority != rightPriority) {
+                val higherIndex = if (leftPriority == 0) 0 else 1

Review comment:
       not changed here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to