[ https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hequn Cheng updated FLINK-8898: ------------------------------- Description: Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators in conditions of LogicalFilter which leads to type miss exceptions. We can reproduce the exception by the following test case. {code:java} @Test def reproduceTypeMissmatch(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) env.setStateBackend(getStateBackend) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) StreamITCase.clear val data1 = new mutable.MutableList[(Int, Long, Int, Long)] data1.+=((1, 1L, 1, 1L)) data1.+=((1, 2L, 1, 1L)) val t1 = env.fromCollection(data1) .assignTimestampsAndWatermarks(new Row5WatermarkExtractor) .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime) tEnv.registerTable("myTable", t1) val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition by id order by eventTs rows" + " between 100 preceding and current row) as cnt1 from myTable" val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over (partition by id " + "order by eventTs rows between 50 preceding and current row) as cnt2 from myTable" val left = tEnv.sqlQuery(sql1) val right = tEnv.sqlQuery(sql2) left.join(right).where("id = r_id && eventTs === r_eventTs").select('id) .writeToSink(new TestRetractSink, queryConfig) env.execute() } {code} was:Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators in conditions of LogicalFilter which leads to type miss exceptions. > Materialize time indicators in conditions of LogicalFilter > ---------------------------------------------------------- > > Key: FLINK-8898 > URL: https://issues.apache.org/jira/browse/FLINK-8898 > Project: Flink > Issue Type: Bug > Reporter: Hequn Cheng > Priority: Major > > Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators > in conditions of LogicalFilter which leads to type miss exceptions. We can > reproduce the exception by the following test case. > {code:java} > @Test > def reproduceTypeMissmatch(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > env.setStateBackend(getStateBackend) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > StreamITCase.clear > val data1 = new mutable.MutableList[(Int, Long, Int, Long)] > data1.+=((1, 1L, 1, 1L)) > data1.+=((1, 2L, 1, 1L)) > val t1 = env.fromCollection(data1) > .assignTimestampsAndWatermarks(new Row5WatermarkExtractor) > .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime) > tEnv.registerTable("myTable", t1) > val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition > by id order by eventTs rows" + > " between 100 preceding and current row) as cnt1 from myTable" > val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over > (partition by id " + > "order by eventTs rows between 50 preceding and current row) as cnt2 from > myTable" > val left = tEnv.sqlQuery(sql1) > val right = tEnv.sqlQuery(sql2) > left.join(right).where("id = r_id && eventTs === r_eventTs").select('id) > .writeToSink(new TestRetractSink, queryConfig) > env.execute() > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)