[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655138#comment-16655138
 ] 

ASF GitHub Bot commented on FLINK-9715:
---------------------------------------

pnowojski closed pull request #6776: [FLINK-9715][table] Support temporal join 
with event time
URL: https://github.com/apache/flink/pull/6776
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
index d54fd785862..3a38bc58800 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -109,7 +109,7 @@ class DataStreamJoin(
     val joinTranslator = createTranslator(tableEnv)
 
     val joinOpName = joinToString(getRowType, joinCondition, joinType, 
getExpressionString)
-    val coProcessFunction = joinTranslator.getCoProcessFunction(
+    val joinOperator = joinTranslator.getJoinOperator(
       joinType,
       schema.fieldNames,
       ruleDescription,
@@ -118,9 +118,10 @@ class DataStreamJoin(
       .keyBy(
         joinTranslator.getLeftKeySelector(),
         joinTranslator.getRightKeySelector())
-      .process(coProcessFunction)
-      .name(joinOpName)
-      .returns(CRowTypeInfo(schema.typeInfo))
+      .transform(
+        joinOpName,
+        CRowTypeInfo(schema.typeInfo),
+        joinOperator)
   }
 
   private def validateKeyTypes(): Unit = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 054476a7036..e8f9ff4efdb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
@@ -58,11 +59,11 @@ class DataStreamJoinToCoProcessTranslator(
       rightSchema.projectedTypeInfo(joinInfo.rightKeys.toIntArray))
   }
 
-  def getCoProcessFunction(
+  def getJoinOperator(
       joinType: JoinRelType,
       returnFieldNames: Seq[String],
       ruleDescription: String,
-      queryConfig: StreamQueryConfig): CoProcessFunction[CRow, CRow, CRow] = {
+      queryConfig: StreamQueryConfig): TwoInputStreamOperator[CRow, CRow, 
CRow] = {
     // input must not be nullable, because the runtime join function will make 
sure
     // the code-generated function won't process null inputs
     val generator = new FunctionCodeGenerator(
@@ -97,16 +98,16 @@ class DataStreamJoinToCoProcessTranslator(
       body,
       returnType)
 
-    createCoProcessFunction(joinType, queryConfig, genFunction)
+    createJoinOperator(joinType, queryConfig, genFunction)
   }
 
-  protected def createCoProcessFunction(
+  protected def createJoinOperator(
     joinType: JoinRelType,
     queryConfig: StreamQueryConfig,
     genFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
-    : CoProcessFunction[CRow, CRow, CRow] = {
+    : TwoInputStreamOperator[CRow, CRow, CRow] = {
 
-    joinType match {
+    val joinFunction = joinType match {
       case JoinRelType.INNER =>
         new NonWindowInnerJoin(
           leftSchema.typeInfo,
@@ -145,5 +146,6 @@ class DataStreamJoinToCoProcessTranslator(
           genFunction.code,
           queryConfig)
     }
+    new KeyedCoProcessOperator(joinFunction)
   }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
index 467e84f3907..25a14ef84e5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
@@ -22,19 +22,22 @@ import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory._
 import org.apache.flink.table.codegen.GeneratedFunction
 import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
 import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.util.RexDefaultVisitor
-import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.join.{TemporalProcessTimeJoin, 
TemporalRowtimeJoin}
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions.checkState
 
+/**
+  * @param rightTimeAttributeInputReference is defined only for event time 
joins.
+  */
 class DataStreamTemporalJoinToCoProcessTranslator private (
     textualRepresentation: String,
     config: TableConfig,
@@ -43,9 +46,8 @@ class DataStreamTemporalJoinToCoProcessTranslator private (
     rightSchema: RowSchema,
     joinInfo: JoinInfo,
     rexBuilder: RexBuilder,
-    leftTimeAttribute: RexNode,
-    rightTimeAttribute: Option[RexNode],
-    rightPrimaryKeyExpression: RexNode,
+    leftTimeAttributeInputReference: Int,
+    rightTimeAttributeInputReference: Option[Int],
     remainingNonEquiJoinPredicates: RexNode)
   extends DataStreamJoinToCoProcessTranslator(
     config,
@@ -57,25 +59,32 @@ class DataStreamTemporalJoinToCoProcessTranslator private (
 
   override val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
 
-  override protected def createCoProcessFunction(
+  override protected def createJoinOperator(
       joinType: JoinRelType,
       queryConfig: StreamQueryConfig,
       joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
-    : CoProcessFunction[CRow, CRow, CRow] = {
-
-    if (rightTimeAttribute.isDefined) {
-      throw new ValidationException(
-        s"Currently only proctime temporal joins are supported in 
[$textualRepresentation]")
-    }
+    : TwoInputStreamOperator[CRow, CRow, CRow] = {
 
     joinType match {
       case JoinRelType.INNER =>
-        new TemporalJoin(
-          leftSchema.typeInfo,
-          rightSchema.typeInfo,
-          joinFunction.name,
-          joinFunction.code,
-          queryConfig)
+        if (rightTimeAttributeInputReference.isDefined) {
+          new TemporalRowtimeJoin(
+            leftSchema.typeInfo,
+            rightSchema.typeInfo,
+            joinFunction.name,
+            joinFunction.code,
+            queryConfig,
+            leftTimeAttributeInputReference,
+            rightTimeAttributeInputReference.get)
+        }
+        else {
+          new TemporalProcessTimeJoin(
+            leftSchema.typeInfo,
+            rightSchema.typeInfo,
+            joinFunction.name,
+            joinFunction.code,
+            queryConfig)
+        }
       case _ =>
        throw new ValidationException(
          s"Only ${JoinRelType.INNER} temporal join is supported in 
[$textualRepresentation]")
@@ -121,12 +130,25 @@ object DataStreamTemporalJoinToCoProcessTranslator {
       rightSchema,
       joinInfo,
       rexBuilder,
-      temporalJoinConditionExtractor.leftTimeAttribute.get,
-      temporalJoinConditionExtractor.rightTimeAttribute,
-      temporalJoinConditionExtractor.rightPrimaryKeyExpression.get,
+      extractInputReference(
+        temporalJoinConditionExtractor.leftTimeAttribute.get,
+        textualRepresentation),
+      temporalJoinConditionExtractor.rightTimeAttribute.map(
+        rightTimeAttribute =>
+          extractInputReference(rightTimeAttribute, textualRepresentation) - 
leftSchema.arity),
       remainingNonEquiJoinPredicates)
   }
 
+  private def extractInputReference(rexNode: RexNode, textualRepresentation: 
String): Int = {
+    val inputReferenceVisitor = new 
InputReferenceVisitor(textualRepresentation)
+    rexNode.accept(inputReferenceVisitor)
+    checkState(
+      inputReferenceVisitor.inputReference.isDefined,
+      "Failed to find input reference in [%s]",
+      textualRepresentation)
+    inputReferenceVisitor.inputReference.get
+  }
+
   private class TemporalJoinConditionExtractor(
       textualRepresentation: String,
       rightKeysStartingOffset: Int,
@@ -148,8 +170,8 @@ object DataStreamTemporalJoinToCoProcessTranslator {
 
       checkState(
         leftTimeAttribute.isEmpty
-        && rightPrimaryKeyExpression.isEmpty
-        && rightTimeAttribute.isEmpty,
+          && rightPrimaryKeyExpression.isEmpty
+          && rightTimeAttribute.isEmpty,
         "Multiple %s functions in [%s]",
         TEMPORAL_JOIN_CONDITION,
         textualRepresentation)
@@ -170,8 +192,6 @@ object DataStreamTemporalJoinToCoProcessTranslator {
             s"Non rowtime timeAttribute [${leftTimeAttribute.get.getType}] " +
               s"passed as the argument to TemporalTableFunction")
         }
-
-        throw new TableException("Event time temporal joins are not yet 
supported.")
       }
       else if (LogicalTemporalTableJoin.isProctimeCall(call)) {
         leftTimeAttribute = Some(call.getOperands.get(0))
@@ -196,30 +216,27 @@ object DataStreamTemporalJoinToCoProcessTranslator {
           s"Only single column join key is supported. " +
             s"Found ${joinInfo.rightKeys} in [$textualRepresentation]")
       }
-      val rightKey = joinInfo.rightKeys.get(0) + rightKeysStartingOffset
+      val rightJoinKeyInputReference = joinInfo.rightKeys.get(0) + 
rightKeysStartingOffset
 
-      val primaryKeyVisitor = new PrimaryKeyVisitor(textualRepresentation)
-      rightPrimaryKey.accept(primaryKeyVisitor)
+      val rightPrimaryKeyInputReference = extractInputReference(
+        rightPrimaryKey,
+        textualRepresentation)
 
-      primaryKeyVisitor.inputReference match {
-        case None =>
-          throw new IllegalStateException(
-            s"Failed to find primary key reference in 
[$textualRepresentation]")
-        case Some(primaryKeyInputReference) if primaryKeyInputReference != 
rightKey =>
-          throw new ValidationException(
-            s"Join key [$rightKey] must be the same as " +
-              s"temporal table's primary key [$primaryKeyInputReference] " +
-              s"in [$textualRepresentation]")
-        case _ =>
-          rightPrimaryKey
+      if (rightPrimaryKeyInputReference != rightJoinKeyInputReference) {
+        throw new ValidationException(
+          s"Join key [$rightJoinKeyInputReference] must be the same as " +
+            s"temporal table's primary key [$rightPrimaryKey] " +
+            s"in [$textualRepresentation]")
       }
+
+      rightPrimaryKey
     }
   }
 
   /**
-    * Extracts input references from primary key expression.
+    * Extracts input references from RexNode.
     */
-  private class PrimaryKeyVisitor(textualRepresentation: String)
+  private class InputReferenceVisitor(textualRepresentation: String)
     extends RexDefaultVisitor[RexNode] {
 
     var inputReference: Option[Int] = None
@@ -231,7 +248,7 @@ object DataStreamTemporalJoinToCoProcessTranslator {
 
     override def visitNode(rexNode: RexNode): RexNode = {
       throw new ValidationException(
-        s"Unsupported right primary key expression [$rexNode] in 
[$textualRepresentation]")
+        s"Unsupported expression [$rexNode] in [$textualRepresentation]. 
Expected input reference")
     }
   }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
similarity index 74%
rename from 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
rename to 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
index 5087515247f..175af91e9e4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingCollector
@@ -29,15 +29,15 @@ import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 
-class TemporalJoin(
+class TemporalProcessTimeJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
-  extends CoProcessFunction[CRow, CRow, CRow]
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
   with Compiler[FlatJoinFunction[Row, Row, Row]]
   with Logging {
 
@@ -46,10 +46,11 @@ class TemporalJoin(
 
   protected var rightState: ValueState[Row] = _
   protected var cRowWrapper: CRowWrappingCollector = _
+  protected var collector: TimestampedCollector[CRow] = _
 
   protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
 
-  override def open(parameters: Configuration): Unit = {
+  override def open(): Unit = {
     val clazz = compile(
       getRuntimeContext.getUserCodeClassLoader,
       genJoinFuncName,
@@ -60,32 +61,27 @@ class TemporalJoin(
     val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
     rightState = getRuntimeContext.getState(rightStateDescriptor)
 
+    collector = new TimestampedCollector[CRow](output)
     cRowWrapper = new CRowWrappingCollector()
+    cRowWrapper.out = collector
   }
 
-  override def processElement1(
-      value: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
+  override def processElement1(element: StreamRecord[CRow]): Unit = {
 
     if (rightState.value() == null) {
       return
     }
 
-    cRowWrapper.out = out
-    cRowWrapper.setChange(value.change)
+    cRowWrapper.setChange(element.getValue.change)
 
     val rightSideRow = rightState.value()
-    joinFunction.join(value.row, rightSideRow, cRowWrapper)
+    joinFunction.join(element.getValue.row, rightSideRow, cRowWrapper)
   }
 
-  override def processElement2(
-      value: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
+  override def processElement2(element: StreamRecord[CRow]): Unit = {
 
-    if (value.change) {
-      rightState.update(value.row)
+    if (element.getValue.change) {
+      rightState.update(element.getValue.row)
     } else {
       rightState.clear()
     }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
new file mode 100644
index 00000000000..17f8e1a233f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.{Comparator, Optional}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    queryConfig: StreamQueryConfig,
+    leftTimeAttribute: Int,
+    rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+    * Incremental index generator for `leftState`'s keys.
+    */
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+    * Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+    * We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+    *
+    * TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's 
timestamp, to avoid
+    * full map traversals (if we have lots of rows on the state that exceed 
`currentWatermark`).
+    */
+  private var leftState: MapState[JLong, Row] = _
+
+  /**
+    * Mapping from timestamp to right side `Row`.
+    *
+    * TODO: having `rightState` as an OrderedMapState would allow us to avoid 
sorting cost
+    * once per watermark
+    */
+  private var rightState: MapState[JLong, Row] = _
+
+  private var registeredTimer: ValueState[JLong] = _ // JLong for correct 
handling of default null
+
+  private var cRowWrapper: CRowWrappingCollector = _
+  private var collector: TimestampedCollector[CRow] = _
+  private var timerService: SimpleTimerService = _
+
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(): Unit = {
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+
+    joinFunction = clazz.newInstance()
+
+    nextLeftIndex = getRuntimeContext.getState(
+      new ValueStateDescriptor[JLong](NEXT_LEFT_INDEX_STATE_NAME, 
BasicTypeInfo.LONG_TYPE_INFO))
+    leftState = getRuntimeContext.getMapState(
+      new MapStateDescriptor[JLong, Row](LEFT_STATE_NAME, 
BasicTypeInfo.LONG_TYPE_INFO, leftType))
+    rightState = getRuntimeContext.getMapState(
+      new MapStateDescriptor[JLong, Row](RIGHT_STATE_NAME, 
BasicTypeInfo.LONG_TYPE_INFO, rightType))
+    registeredTimer = getRuntimeContext.getState(
+      new ValueStateDescriptor[JLong](REGISTERED_TIMER_STATE_NAME, 
BasicTypeInfo.LONG_TYPE_INFO))
+
+    collector = new TimestampedCollector[CRow](output)
+    cRowWrapper = new CRowWrappingCollector()
+    cRowWrapper.out = collector
+    cRowWrapper.setChange(true)
+
+    val internalTimerService = getInternalTimerService(
+      TIMERS_STATE_NAME,
+      VoidNamespaceSerializer.INSTANCE,
+      this)
+
+    timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override def processElement1(element: StreamRecord[CRow]): Unit = {
+    checkNotRetraction(element)
+
+    leftState.put(getNextLeftIndex, element.getValue.row)
+    registerSmallestTimer(getLeftTime(element.getValue.row)) // Timer to emit 
and clean up the state
+  }
+
+  override def processElement2(element: StreamRecord[CRow]): Unit = {
+    checkNotRetraction(element)
+
+    val rowTime = getRightTime(element.getValue.row)
+    rightState.put(rowTime, element.getValue.row)
+    registerSmallestTimer(rowTime) // Timer to clean up the state
+  }
+
+  private def registerSmallestTimer(timestamp: Long): Unit = {
+    val currentRegisteredTimer = registeredTimer.value()
+    if (currentRegisteredTimer == null) {
+      registerTimer(timestamp)
+    }
+    else if (currentRegisteredTimer != null && currentRegisteredTimer > 
timestamp) {
+      timerService.deleteEventTimeTimer(currentRegisteredTimer)
+      registerTimer(timestamp)
+    }
+  }
+
+  private def registerTimer(timestamp: Long): Unit = {
+    registeredTimer.update(timestamp)
+    timerService.registerEventTimeTimer(timestamp)
+  }
+
+  override def onProcessingTime(timer: InternalTimer[Any, VoidNamespace]): 
Unit = {
+    throw new IllegalStateException("This should never happen")
+  }
+
+  override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = {
+    registeredTimer.clear()
+    val lastUnprocessedTime = 
emitResultAndCleanUpState(timerService.currentWatermark())
+    if (lastUnprocessedTime < Long.MaxValue) {
+      registerTimer(lastUnprocessedTime)
+    }
+  }
+
+  /**
+    * @return a row time of the oldest unprocessed probe record or 
Long.MaxValue, if all records
+    *         have been processed.
+    */
+  private def emitResultAndCleanUpState(timerTimestamp: Long): Long = {
+    val rightRowsSorted = getRightRowsSorted(rightRowtimeComparator)
+    var lastUnprocessedTime = Long.MaxValue
+
+    val leftIterator = leftState.entries().iterator()
+    while (leftIterator.hasNext) {
+      val leftEntry = leftIterator.next()
+      val leftRow = leftEntry.getValue
+      val leftTime = getLeftTime(leftRow)
+
+      if (leftTime <= timerTimestamp) {
+        val rightRow = latestRightRowToJoin(rightRowsSorted, leftTime)
+        if (rightRow.isPresent) {
+          joinFunction.join(leftRow, rightRow.get, cRowWrapper)
+        }
+        leftIterator.remove()
+      }
+      else {
+        lastUnprocessedTime = Math.min(lastUnprocessedTime, leftTime)
+      }
+    }
+
+    cleanUpState(timerTimestamp, rightRowsSorted)
+    lastUnprocessedTime
+  }
+
+  /**
+    * Removes all right entries older then the watermark, except the latest 
one. For example with:
+    * rightState = [1, 5, 9]
+    * and
+    * watermark = 6
+    * we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8 could
+    * be joined with it later
+    */
+  private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
+    var i = 0
+    val indexToKeep = firstIndexToKeep(timerTimestamp, rightRowsSorted)
+    while (i < indexToKeep) {
+      val rightTime = getRightTime(rightRowsSorted.get(i))
+      rightState.remove(rightTime)
+      i += 1
+    }
+  }
+
+  private def firstIndexToKeep(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]): Int = {
+    val firstIndexNewerThenTimer =
+      indexOfFirstElementNewerThanTimer(timerTimestamp, rightRowsSorted)
+
+    if (firstIndexNewerThenTimer < 0) {
+      rightRowsSorted.size() - 1
+    }
+    else {
+      firstIndexNewerThenTimer - 1
+    }
+  }
+
+  private def indexOfFirstElementNewerThanTimer(
+      timerTimestamp: Long,
+      list: util.List[Row]): Int = {
+    val iter = list.listIterator
+    while (iter.hasNext) {
+      if (getRightTime(iter.next) > timerTimestamp) {
+        return iter.previousIndex
+      }
+    }
+    -1
+  }
+
+  /**
+    * Binary search `rightRowsSorted` to find the latest right row to join 
with `leftTime`.
+    * Latest means a right row with largest time that is still smaller or 
equal to `leftTime`.
+    *
+    * @return found element or `Optional.empty` If such row was not found 
(either `rightRowsSorted`
+    *         is empty or all `rightRowsSorted` are are newer).
+    */
+  private def latestRightRowToJoin(
+      rightRowsSorted: util.List[Row],
+      leftTime: Long): Optional[Row] = {
+    latestRightRowToJoin(rightRowsSorted, 0, rightRowsSorted.size - 1, 
leftTime)
+  }
+
+  private def latestRightRowToJoin(
+      rightRowsSorted: util.List[Row],
+      low: Int,
+      high: Int,
+      leftTime: Long): Optional[Row] = {
+    if (low > high) {
+      // exact value not found, we are returning largest from the values 
smaller then leftTime
+      if (low - 1 < 0) {
+        Optional.empty()
+      }
+      else {
+        Optional.of(rightRowsSorted.get(low - 1))
+      }
+    } else {
+      val mid = (low + high) >>> 1
+      val midRow = rightRowsSorted.get(mid)
+      val midTime = getRightTime(midRow)
+      val cmp = midTime.compareTo(leftTime)
+      if (cmp < 0) {
+        latestRightRowToJoin(rightRowsSorted, mid + 1, high, leftTime)
+      }
+      else if (cmp > 0) {
+        latestRightRowToJoin(rightRowsSorted, low, mid - 1, leftTime)
+      }
+      else {
+        Optional.of(midRow)
+      }
+    }
+  }
+
+  private def getRightRowsSorted(rowtimeComparator: RowtimeComparator): 
util.List[Row] = {
+    val rightRows = new util.ArrayList[Row]()
+    for (row <- rightState.values()) {
+      rightRows.add(row)
+    }
+    rightRows.sort(rowtimeComparator)
+    rightRows.asInstanceOf[util.List[Row]]
+  }
+
+  private def getNextLeftIndex: JLong = {
+    var index = nextLeftIndex.value()
+    if (index == null) {
+      index = 0L
+    }
+    nextLeftIndex.update(index + 1)
+    index
+  }
+
+  private def getLeftTime(leftRow: Row): Long = {
+    leftRow.getField(leftTimeAttribute).asInstanceOf[Long]
+  }
+
+  private def getRightTime(rightRow: Row): Long = {
+    rightRow.getField(rightTimeAttribute).asInstanceOf[Long]
+  }
+
+  private def checkNotRetraction(element: StreamRecord[CRow]) = {
+    if (!element.getValue.change) {
+      throw new IllegalStateException(
+        s"Retractions are not supported by 
[${classOf[TemporalRowtimeJoin].getSimpleName}]. " +
+          "If this can happen it should be validated during planning!")
+    }
+  }
+}
+
+class RowtimeComparator(timeAttribute: Int) extends Comparator[Row] with 
Serializable {
+  override def compare(o1: Row, o2: Row): Int = {
+    val o1Time = o1.getField(timeAttribute).asInstanceOf[Long]
+    val o2Time = o2.getField(timeAttribute).asInstanceOf[Long]
+    o1Time.compareTo(o2Time)
+  }
+}
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
index 58ad493566d..9d0420d2e4f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -106,19 +106,4 @@ class TemporalTableJoinValidationTest extends 
TableTestBase {
 
     util.explain(result)
   }
-
-  @Test
-  def testEventTimeIndicators(): Unit = {
-    expectedException.expect(classOf[TableException])
-    expectedException.expectMessage(
-      "Event time temporal joins are not yet supported.")
-
-    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
-
-    val result = orders
-      .join(rates('o_rowtime), "currency = o_currency")
-      .select("o_amount * rate").as("rate")
-
-    util.explain(result)
-  }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index dd6e00e73bc..f9220e68f0f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.Future
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, 
HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql2rel.RelDecorrelator
@@ -58,7 +59,7 @@ import scala.collection.mutable
   */
 abstract class ExpressionTestBase {
 
-  private val testExprs = mutable.ArrayBuffer[(RexNode, String)]()
+  private val testExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
 
   // setup test utils
   private val tableName = "testTable"
@@ -118,7 +119,7 @@ abstract class ExpressionTestBase {
     val generator = new FunctionCodeGenerator(config, false, typeInfo)
 
     // cast expressions to String
-    val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, 
VARCHAR))
+    val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._2, 
VARCHAR))
 
     // generate code
     val resultType = new 
RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*)
@@ -170,10 +171,10 @@ abstract class ExpressionTestBase {
     testExprs
       .zipWithIndex
       .foreach {
-        case ((expr, expected), index) =>
+        case ((originalExpr, optimizedExpr, expected), index) =>
           val actual = result.getField(index)
           assertEquals(
-            s"Wrong result for: $expr",
+            s"Wrong result for: [$originalExpr] optimized to: 
[$optimizedExpr]",
             expected,
             if (actual == null) "null" else actual)
       }
@@ -185,70 +186,35 @@ abstract class ExpressionTestBase {
     val validated = planner.validate(parsed)
     val converted = planner.rel(validated).rel
 
-    val decorPlan = RelDecorrelator.decorrelateQuery(converted)
-
-    // normalize
-    val normalizedPlan = if 
(FlinkRuleSets.DATASET_NORM_RULES.iterator().hasNext) {
-      val planner = hepPlanner
-      planner.setRoot(decorPlan)
-      planner.findBestExp
-    } else {
-      decorPlan
-    }
-
-    // convert to logical plan
-    val logicalProps = 
converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    val logicalCalc = logicalOptProgram.run(context._2.getPlanner, 
normalizedPlan, logicalProps,
-      ImmutableList.of(), ImmutableList.of())
-
-    // convert to dataset plan
-    val physicalProps = 
converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
-    val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, 
logicalCalc, physicalProps,
-      ImmutableList.of(), ImmutableList.of())
+    val env = context._2.asInstanceOf[BatchTableEnvironment]
+    val optimized = env.optimize(converted)
 
     // throw exception if plan contains more than a calc
-    if (!dataSetCalc.getInput(0).isInstanceOf[DataSetScan]) {
+    if (!optimized.getInput(0).isInstanceOf[DataSetScan]) {
       fail("Expression is converted into more than a Calc operation. Use a 
different test method.")
     }
 
-    // extract RexNode
-    val calcProgram = dataSetCalc
-     .asInstanceOf[DataSetCalc]
-     .getProgram
-    val expanded = 
calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
-
-    testExprs += ((expanded, expected))
+    testExprs += ((sqlExpr, extractRexNode(optimized), expected))
   }
 
   private def addTableApiTestExpr(tableApiExpr: Expression, expected: String): 
Unit = {
     // create RelNode from Table API expression
-    val env = context._2
+    val env = context._2.asInstanceOf[BatchTableEnvironment]
     val converted = env
-      .asInstanceOf[BatchTableEnvironment]
       .scan(tableName)
       .select(tableApiExpr)
       .getRelNode
 
-    // create DataSetCalc
-    val decorPlan = RelDecorrelator.decorrelateQuery(converted)
-
-    // convert to logical plan
-    val flinkLogicalProps = 
converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    val logicalCalc = logicalOptProgram.run(context._2.getPlanner, decorPlan, 
flinkLogicalProps,
-      ImmutableList.of(), ImmutableList.of())
+    val optimized = env.optimize(converted)
 
-    // convert to dataset plan
-    val flinkPhysicalProps = 
converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
-    val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, 
logicalCalc, flinkPhysicalProps,
-      ImmutableList.of(), ImmutableList.of())
-
-    // extract RexNode
-    val calcProgram = dataSetCalc
-     .asInstanceOf[DataSetCalc]
-     .getProgram
-    val expanded = 
calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
+    testExprs += ((tableApiExpr.toString, extractRexNode(optimized), expected))
+  }
 
-    testExprs += ((expanded, expected))
+  private def extractRexNode(node: RelNode): RexNode = {
+    val calcProgram = node
+      .asInstanceOf[DataSetCalc]
+      .getProgram
+    calcProgram.expandLocalRef(calcProgram.getProjectList.get(0))
   }
 
   private def addTableApiTestExpr(tableApiString: String, expected: String): 
Unit = {
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
index 43f35482f23..0d46db41970 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.runtime.harness
 
+import java.util
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
@@ -27,8 +28,8 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
@@ -41,9 +42,14 @@ import org.apache.flink.table.runtime.CRowKeySelector
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 TestStreamQueryConfig}
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.hamcrest.Matchers.startsWith
+import org.hamcrest.{CoreMatchers, Matcher}
+import org.hamcrest.Matchers.{endsWith, startsWith}
+import org.junit.Assert.assertTrue
 import org.junit.Test
 
+import scala.collection.mutable.ArrayBuffer
+import collection.JavaConverters._
+
 class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
@@ -57,14 +63,18 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val ORDERS_PROCTIME = "o_proctime"
 
+  private val ORDERS_ROWTIME = "o_rowtime"
+
   private val RATES_KEY = "r_currency"
 
+  private val RATES_ROWTIME = "r_rowtime"
+
   private val ordersRowtimeType = new RowTypeInfo(
     Array[TypeInformation[_]](
       Types.LONG,
       Types.STRING,
       TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
-    Array("o_amount", ORDERS_KEY, "o_rowtime"))
+    Array("o_amount", ORDERS_KEY, ORDERS_ROWTIME))
 
   private val ordersProctimeType = new RowTypeInfo(
     Array[TypeInformation[_]](
@@ -78,7 +88,7 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
       Types.STRING,
       Types.LONG,
       TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
-    Array(RATES_KEY, "r_rate", "r_rowtime"))
+    Array(RATES_KEY, "r_rate", RATES_ROWTIME))
 
   private val ratesProctimeType = new RowTypeInfo(
     Array[TypeInformation[_]](
@@ -93,6 +103,186 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val rexBuilder = new RexBuilder(typeFactory)
 
+  @Test
+  def testRowtime() {
+    val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // process without conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L)))
+
+    // process (out of order) with conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 2L)))
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 2L, "Euro", 114L, 
1L)))
+
+    // initiate conversion rates
+    testHarness.processElement2(new StreamRecord(CRow("US Dollar", 102L, 1L)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 1L)))
+    testHarness.processElement2(new StreamRecord(CRow("Yen", 1L, 1L)))
+
+    // process again without conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L)))
+
+    // process with conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(1L, "US Dollar", 3L)))
+    testHarness.processElement1(new StreamRecord(CRow(50L, "Yen", 4L)))
+    expectedOutput.add(new StreamRecord(CRow(1L, "US Dollar", 3L, "US Dollar", 
102L, 1L)))
+    expectedOutput.add(new StreamRecord(CRow(50L, "Yen", 4L, "Yen", 1L, 1L)))
+
+    // update Euro #1
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, 5L)))
+
+    // process with old Euro
+    testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", 4L)))
+    expectedOutput.add(new StreamRecord(CRow(3L, "Euro", 4L, "Euro", 114L, 
1L)))
+
+    // again update Euro #2
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 119L, 7L)))
+
+    // process with updated Euro #1
+    testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", 5L)))
+    expectedOutput.add(new StreamRecord(CRow(3L, "Euro", 5L, "Euro", 116L, 
5L)))
+
+    // process US Dollar
+    testHarness.processElement1(new StreamRecord(CRow(5L, "US Dollar", 7L)))
+    expectedOutput.add(new StreamRecord(CRow(5L, "US Dollar", 7L, "US Dollar", 
102L, 1L)))
+
+    assertTrue(testHarness.getOutput.isEmpty)
+    testHarness.processWatermark1(new Watermark(10L))
+    assertTrue(testHarness.getOutput.isEmpty)
+    testHarness.processWatermark2(new Watermark(10L))
+    verify(expectedOutput, testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test
+  def testEventsWithSameRowtime() {
+    val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    val time1 = 1L
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 112L, time1)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, time1)))
+
+    val time2 = 2L
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", time2)))
+    testHarness.processElement1(new StreamRecord(CRow(22L, "Euro", time2)))
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", time2, "Euro", 114L, 
time1)))
+    expectedOutput.add(new StreamRecord(CRow(22L, "Euro", time2, "Euro", 114L, 
time1)))
+
+    testHarness.processWatermark1(new Watermark(time2))
+    testHarness.processWatermark2(new Watermark(time2))
+    verify(expectedOutput, testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test
+  def testRowtimePickCorrectRowFromTemporalTable() {
+    val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+    testHarness.open()
+
+    val expectedOutput = processEuro(testHarness)
+
+    testHarness.processWatermark1(new Watermark(10L))
+    testHarness.processWatermark2(new Watermark(10L))
+    verify(new util.LinkedList(expectedOutput.asJava), testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test
+  def testRowtimeWatermarkHandling() {
+    val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+    testHarness.open()
+
+    val expectedOutput = processEuro(testHarness)
+
+    testHarness.processWatermark1(new Watermark(3L))
+    testHarness.processWatermark2(new Watermark(2L))
+    verify(new util.LinkedList(expectedOutput.slice(0, 2).asJava), 
testHarness.getOutput)
+
+    testHarness.processWatermark1(new Watermark(12L))
+    testHarness.processWatermark2(new Watermark(5L))
+    verify(new util.LinkedList(expectedOutput.slice(0, 5).asJava), 
testHarness.getOutput)
+
+    testHarness.processWatermark2(new Watermark(10L))
+    verify(new util.LinkedList(expectedOutput.asJava), testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  /**
+    * Cleaning up the state when processing watermark exceeding all events 
should always keep
+    * one latest event in TemporalTable.
+    */
+  @Test
+  def testRowtimeStateCleanUpShouldAlwaysKeepOneLatestRow() {
+    val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+    testHarness.open()
+
+    val expectedOutput = processEuro(testHarness)
+
+    testHarness.processWatermark1(new Watermark(9999L))
+    testHarness.processWatermark2(new Watermark(9999L))
+
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10000L)))
+
+    testHarness.processWatermark1(new Watermark(10000L))
+    testHarness.processWatermark2(new Watermark(10000L))
+
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 10000L, "Euro", 9L, 
9L))
+    verify(new util.LinkedList(expectedOutput.asJava), testHarness.getOutput)
+    
+    testHarness.close()
+  }
+
+  def processEuro(
+    testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, 
CRow])
+  : ArrayBuffer[Object] = {
+
+    // process conversion rates
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 1L, 1L)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 3L, 3L)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 5L, 5L)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 7L, 7L)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 9L, 9L)))
+
+    // process orders
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 2L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 3L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 8L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 9L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))
+
+    var expectedOutput = new ArrayBuffer[Object]()
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 1L, 1L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 2L, "Euro", 1L, 1L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 3L, "Euro", 3L, 3L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 3L, 3L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 5L, 5L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 5L, 5L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 7L, "Euro", 7L, 7L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 8L, "Euro", 7L, 7L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 9L, "Euro", 9L, 9L))
+    expectedOutput += new StreamRecord(CRow(2L, "Euro", 10L, "Euro", 9L, 9L))
+    expectedOutput
+  }
+
   @Test
   def testProctime() {
     val testHarness = createTestHarness(new 
OrdersRatesProctimeTemporalJoinInfo)
@@ -276,7 +466,10 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   @Test
   def testUnsupportedPrimaryKeyInTemporalJoinCondition() {
     expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage(startsWith("Unsupported right primary key 
expression"))
+    expectedException.expectMessage(
+      CoreMatchers.allOf[String](
+        startsWith("Unsupported expression"),
+        endsWith("Expected input reference")))
 
     translateJoin(
       new OrdersRatesProctimeTemporalJoinInfo() {
@@ -324,14 +517,11 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   def createTestHarness(temporalJoinInfo: TemporalJoinInfo)
     : KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = {
 
-    val (leftKeySelector, rightKeySelector, joinCoProcessFunction) =
+    val (leftKeySelector, rightKeySelector, joinOperator) =
       translateJoin(temporalJoinInfo)
 
-    val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
-      new KeyedCoProcessOperator[String, CRow, CRow, 
CRow](joinCoProcessFunction)
-
     new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
-      operator,
+      joinOperator,
       leftKeySelector.asInstanceOf[KeySelector[CRow, String]],
       rightKeySelector.asInstanceOf[KeySelector[CRow, String]],
       BasicTypeInfo.STRING_TYPE_INFO,
@@ -341,7 +531,7 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   }
 
   def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = 
JoinRelType.INNER)
-    : (CRowKeySelector, CRowKeySelector, CoProcessFunction[CRow, CRow, CRow]) 
= {
+    : (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, 
CRow]) = {
 
     val leftType = joinInfo.leftRowType
     val rightType = joinInfo.rightRowType
@@ -358,7 +548,7 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
       joinInfo,
       rexBuilder)
 
-    val joinCoProcessFunction = joinTranslator.getCoProcessFunction(
+    val joinOperator = joinTranslator.getJoinOperator(
       joinRelType,
       joinType.getFieldNames,
       "TemporalJoin",
@@ -366,7 +556,7 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
     (joinTranslator.getLeftKeySelector(),
       joinTranslator.getRightKeySelector(),
-      joinCoProcessFunction)
+      joinOperator)
   }
 
   abstract class TemporalJoinInfo(
@@ -426,6 +616,36 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
     }
   }
 
+  class OrdersRatesRowtimeTemporalJoinInfo()
+    extends RowtimeTemporalJoinInfo(
+      ordersRowtimeType,
+      ratesRowtimeType,
+      ORDERS_KEY,
+      RATES_KEY,
+      ORDERS_ROWTIME,
+      RATES_ROWTIME)
+
+  class RowtimeTemporalJoinInfo(
+      leftRowType: RowTypeInfo,
+      rightRowType: RowTypeInfo,
+      leftKey: String,
+      rightKey: String,
+      leftTimeAttribute: String,
+      rightTimeAttribute: String)
+    extends TemporalJoinInfo(
+      leftRowType,
+      rightRowType,
+      leftKey,
+      rightKey) {
+    override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+      LogicalTemporalTableJoin.makeRowTimeTemporalJoinConditionCall(
+        rexBuilder,
+        makeLeftInputRef(leftTimeAttribute),
+        makeRightInputRef(rightTimeAttribute),
+        makeRightInputRef(rightKey))
+    }
+  }
+
   class MissingTemporalJoinConditionJoinInfo(
       leftRowType: RowTypeInfo,
       rightRowType: RowTypeInfo,
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
index b0304a3d116..df0f01be0d5 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
@@ -22,11 +22,14 @@ import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 
 import scala.collection.mutable
@@ -91,9 +94,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase {
 
   @Test
   def testEventTimeInnerJoin(): Unit = {
-    expectedException.expect(classOf[TableException])
-    expectedException.expectMessage("Event time temporal joins are not yet 
supported")
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStateBackend(getStateBackend)
@@ -111,15 +111,33 @@ class TemporalJoinITCase extends 
StreamingWithStateTestBase {
         |WHERE r.currency = o.currency
         |""".stripMargin
 
+
     val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
+    ordersData.+=((2L, "Euro", new Timestamp(2L)))
+    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
+    ordersData.+=((50L, "Yen", new Timestamp(4L)))
+    ordersData.+=((3L, "Euro", new Timestamp(5L)))
 
     val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
+    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
+
+    var expectedOutput = new mutable.HashSet[String]()
+    expectedOutput += (2 * 114).toString
+    expectedOutput += (1 * 102).toString
+    expectedOutput += (50 * 1).toString
+    expectedOutput += (3 * 116).toString
 
     val orders = env
       .fromCollection(ordersData)
+      .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
       .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
     val ratesHistory = env
       .fromCollection(ratesHistoryData)
+      .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
       .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
 
     tEnv.registerTable("Orders", orders)
@@ -131,5 +149,14 @@ class TemporalJoinITCase extends 
StreamingWithStateTestBase {
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
+
+    assertEquals(expectedOutput, StreamITCase.testResults.toSet)
+  }
+}
+
+class TimestampExtractor[T1, T2]
+  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, 
Timestamp)](Time.seconds(10))  {
+  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
+    element._3.getTime
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---------------------------------------
>
>                 Key: FLINK-9715
>                 URL: https://issues.apache.org/jira/browse/FLINK-9715
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to