Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r182427888
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
 ---
    @@ -54,238 +44,51 @@ class NonWindowInnerJoin(
         genJoinFuncName: String,
         genJoinFuncCode: String,
         queryConfig: StreamQueryConfig)
    -  extends CoProcessFunction[CRow, CRow, CRow]
    -  with Compiler[FlatJoinFunction[Row, Row, Row]]
    -  with Logging {
    -
    -  // check if input types implement proper equals/hashCode
    -  validateEqualsHashCode("join", leftType)
    -  validateEqualsHashCode("join", rightType)
    -
    -  // state to hold left stream element
    -  private var leftState: MapState[Row, JTuple2[Int, Long]] = _
    -  // state to hold right stream element
    -  private var rightState: MapState[Row, JTuple2[Int, Long]] = _
    -  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
    -
    -  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
    -  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
    -  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
    -
    -  // state to record last timer of left stream, 0 means no timer
    -  private var leftTimer: ValueState[Long] = _
    -  // state to record last timer of right stream, 0 means no timer
    -  private var rightTimer: ValueState[Long] = _
    -
    -  // other condition function
    -  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
    +  extends NonWindowJoin(
    +    leftType,
    +    rightType,
    +    resultType,
    +    genJoinFuncName,
    +    genJoinFuncCode,
    +    queryConfig) {
     
       override def open(parameters: Configuration): Unit = {
    -    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
    -                s"Code:\n$genJoinFuncCode")
    -    val clazz = compile(
    -      getRuntimeContext.getUserCodeClassLoader,
    -      genJoinFuncName,
    -      genJoinFuncCode)
    -    LOG.debug("Instantiating JoinFunction.")
    -    joinFunction = clazz.newInstance()
    -
    -    // initialize left and right state, the first element of tuple2 
indicates how many rows of
    -    // this row, while the second element represents the expired time of 
this row.
    -    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
    -    val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
    -      "left", leftType, tupleTypeInfo)
    -    val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
    -      "right", rightType, tupleTypeInfo)
    -    leftState = getRuntimeContext.getMapState(leftStateDescriptor)
    -    rightState = getRuntimeContext.getMapState(rightStateDescriptor)
    -
    -    // initialize timer state
    -    val valueStateDescriptor1 = new 
ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    -    leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
    -    val valueStateDescriptor2 = new 
ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    -    rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
    -
    -    cRowWrapper = new CRowWrappingMultiOutputCollector()
    -  }
    -
    -  /**
    -    * Process left stream records
    -    *
    -    * @param valueC The input value.
    -    * @param ctx    The ctx to register timer or get current time
    -    * @param out    The collector for returning result values.
    -    *
    -    */
    -  override def processElement1(
    -      valueC: CRow,
    -      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -      out: Collector[CRow]): Unit = {
    -
    -    processElement(valueC, ctx, out, leftTimer, leftState, rightState, 
isLeft = true)
    -  }
    -
    -  /**
    -    * Process right stream records
    -    *
    -    * @param valueC The input value.
    -    * @param ctx    The ctx to register timer or get current time
    -    * @param out    The collector for returning result values.
    -    *
    -    */
    -  override def processElement2(
    -      valueC: CRow,
    -      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -      out: Collector[CRow]): Unit = {
    -
    -    processElement(valueC, ctx, out, rightTimer, rightState, leftState, 
isLeft = false)
    -  }
    -
    -
    -  /**
    -    * Called when a processing timer trigger.
    -    * Expire left/right records which are expired in left and right state.
    -    *
    -    * @param timestamp The timestamp of the firing timer.
    -    * @param ctx       The ctx to register timer or get current time
    -    * @param out       The collector for returning result values.
    -    */
    -  override def onTimer(
    -      timestamp: Long,
    -      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    -      out: Collector[CRow]): Unit = {
    -
    -    if (stateCleaningEnabled && leftTimer.value == timestamp) {
    -      expireOutTimeRow(
    -        timestamp,
    -        leftState,
    -        leftTimer,
    -        ctx
    -      )
    -    }
    -
    -    if (stateCleaningEnabled && rightTimer.value == timestamp) {
    -      expireOutTimeRow(
    -        timestamp,
    -        rightState,
    -        rightTimer,
    -        ctx
    -      )
    -    }
    -  }
    -
    -  def getNewExpiredTime(
    -      curProcessTime: Long,
    -      oldExpiredTime: Long): Long = {
    -
    -    if (stateCleaningEnabled && curProcessTime + minRetentionTime > 
oldExpiredTime) {
    -      curProcessTime + maxRetentionTime
    -    } else {
    -      oldExpiredTime
    -    }
    +    super.open(parameters)
    +    LOG.debug("Instantiating NonWindowInnerJoin.")
       }
     
       /**
         * Puts or Retract an element from the input stream into state and 
search the other state to
         * output records meet the condition. Records will be expired in state 
if state retention time
         * has been specified.
         */
    -  def processElement(
    +  override def processElement(
           value: CRow,
           ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
           out: Collector[CRow],
           timerState: ValueState[Long],
    -      currentSideState: MapState[Row, JTuple2[Int, Long]],
    -      otherSideState: MapState[Row, JTuple2[Int, Long]],
    +      currentSideState: MapState[Row, JTuple2[Long, Long]],
    +      otherSideState: MapState[Row, JTuple2[Long, Long]],
           isLeft: Boolean): Unit = {
     
         val inputRow = value.row
    +    val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, 
currentSideState)
    --- End diff --
    
    We should not use Scala sugar in runtime classes. This might create an 
tuple object for every processed element.


---

Reply via email to