[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264529#comment-16264529 ]
ASF GitHub Bot commented on FLINK-6094: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r152833454 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -116,14 +135,100 @@ object UpdatingPlanChecker { val windowStartEnd = w.getWindowProperties.map(_.name) // we have only a unique key if at least one window property is selected if (windowStartEnd.nonEmpty) { - keys = Some(groupKeys ++ windowStartEnd) + val smallestAttribute = windowStartEnd.sorted.head + Some((groupKeys.map(e => (e, e)) ++ windowStartEnd.map((_, smallestAttribute))).toList) + } else { + None + } + + case j: DataStreamJoin => + val joinType = j.getJoinType + joinType match { + case JoinRelType.INNER => { + // get key(s) for inner join + val lInputKeys = visit(j.getLeft) + val rInputKeys = visit(j.getRight) + if (lInputKeys.isEmpty || rInputKeys.isEmpty) { + None + } else { + // Output of inner join must have keys if left and right both contain key(s). + // Key groups from both side will be merged by join equi-predicates + val lFieldNames: Seq[String] = j.getLeft.getRowType.getFieldNames + val rFieldNames: Seq[String] = j.getRight.getRowType.getFieldNames + val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys.map(lFieldNames.get(_)) + val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys.map(rFieldNames.get(_)) + + getOutputKeysForInnerJoin( + lFieldNames ++ rFieldNames, + lInputKeys.get ++ rInputKeys.get, + lJoinKeys.zip(rJoinKeys).toList + ) + } + } + case _ => throw new UnsupportedOperationException( + s"An Unsupported JoinType [ $joinType ]") } case _: DataStreamRel => - // anything else does not forward keys or might duplicate key, so we can stop - keys = None + // anything else does not forward keys, so we can stop + None } } - } + def getOutputKeysForInnerJoin( + inNames: Seq[String], + inKeys: List[(String, String)], + joinKeys: List[(String, String)]) + : Option[List[(String, String)]] = { + + val nameToGroups = mutable.HashMap.empty[String,String] + + // merge two groups + def merge(nameA: String, nameB: String): Unit = { + val ga: String = findGroup(nameA); --- End diff -- Remove semicolons. > Implement stream-stream proctime non-window inner join > ------------------------------------------------------- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)