[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16213918#comment-16213918 ]
ASF GitHub Bot commented on FLINK-6094: --------------------------------------- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r146104323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.codegen.FunctionCodeGenerator +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.DataStreamInnerJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +/** + * RelNode for a non-windowed stream join. + */ +class DataStreamJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + joinCondition: RexNode, + joinInfo: JoinInfo, + joinType: JoinRelType, + leftSchema: RowSchema, + rightSchema: RowSchema, + schema: RowSchema, + ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.relDataType + + override def needsUpdatesAsRetraction: Boolean = true + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinCondition, + joinInfo, + joinType, + leftSchema, + rightSchema, + schema, + ruleDescription) + } + + def getJoinInfo: JoinInfo = joinInfo + + def getJoinType: JoinRelType = joinType + + override def toString: String = { + joinToString( + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + joinExplainTerms( + super.explainTerms(pw), + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + + val config = tableEnv.getConfig + val returnType = schema.typeInfo + val keyPairs = joinInfo.pairs().toList + + // get the equality keys + val leftKeys = ArrayBuffer.empty[Int] + val rightKeys = ArrayBuffer.empty[Int] + if (keyPairs.isEmpty) { + // if no equality keys => not supported + throw TableException( + "Joins should have at least one equality condition.\n" + + s"\tLeft: ${left.toString},\n" + + s"\tRight: ${right.toString},\n" + + s"\tCondition: (${joinConditionToString(schema.relDataType, + joinCondition, getExpressionString)})" + ) + } + else { + // at least one equality expression + val leftFields = left.getRowType.getFieldList + val rightFields = right.getRowType.getFieldList + + keyPairs.foreach(pair => { + val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName + val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName + + // check if keys are compatible + if (leftKeyType == rightKeyType) { + // add key pair + leftKeys.add(pair.source) + rightKeys.add(pair.target) + } else { + throw TableException( + "Equality join predicate on incompatible types.\n" + + s"\tLeft: ${left.toString},\n" + + s"\tRight: ${right.toString},\n" + + s"\tCondition: (${joinConditionToString(schema.relDataType, + joinCondition, getExpressionString)})" + ) + } + }) + } + + val leftDataStream = + left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) + val rightDataStream = + right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) + + val (connectOperator, nullCheck) = joinType match { + case JoinRelType.INNER => (leftDataStream.connect(rightDataStream), false) + case _ => throw new UnsupportedOperationException(s"An Unsupported JoinType [ $joinType ]") + } + + if (nullCheck && !config.getNullCheck) { + throw TableException("Null check in TableConfig must be enabled for outer joins.") --- End diff -- The outer joins have not been implemented yet. > Implement stream-stream proctime non-window inner join > ------------------------------------------------------- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)