Hequn Cheng created FLINK-11916:

             Summary: Join with a Temporal Table should throw exception for 
left join
                 Key: FLINK-11916
                 URL: https://issues.apache.org/jira/browse/FLINK-11916
             Project: Flink
          Issue Type: Bug
          Components: API / Table SQL
            Reporter: Hequn Cheng

InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner join 
to left join the test works fine. We may need to throw an exception if we only 
support inner join.

CC [~pnowojski]

The problem can be reproduced with the following sql:
  def testEventTimeInnerJoin(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val sqlQuery =
        |  o.amount * r.rate AS amount
        |  Orders AS o left join
        |  LATERAL TABLE (Rates(o.rowtime)) AS r on true
        |WHERE r.currency = o.currency

    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
    ordersData.+=((2L, "Euro", new Timestamp(2L)))
    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
    ordersData.+=((50L, "Yen", new Timestamp(4L)))
    ordersData.+=((3L, "Euro", new Timestamp(5L)))

    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))

    var expectedOutput = new mutable.HashSet[String]()
    expectedOutput += (2 * 114).toString
    expectedOutput += (3 * 116).toString

    val orders = env
      .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
    val ratesHistory = env
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)

    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("RatesHistory", ratesHistory)
tEnv.scan("RatesHistory").filter('rate > 110L))
    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))

    // Scan from registered table to test for interplay between
    // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])

    assertEquals(expectedOutput, StreamITCase.testResults.toSet)

This message was sent by Atlassian JIRA

Reply via email to