[ 
https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-8898:
-------------------------------
    Description: 
Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators in 
conditions of LogicalFilter which leads to type miss exceptions. We can 
reproduce the exception by the following test case.
{code:java}
@Test
def reproduceTypeMissmatch(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStateBackend(getStateBackend)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
StreamITCase.clear

val data1 = new mutable.MutableList[(Int, Long, Int, Long)]
data1.+=((1, 1L, 1, 1L))
data1.+=((1, 2L, 1, 1L))

val t1 = env.fromCollection(data1)
.assignTimestampsAndWatermarks(new Row5WatermarkExtractor)
.toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime)
tEnv.registerTable("myTable", t1)

val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition by 
id order by eventTs rows" +
" between 100 preceding and current row) as cnt1 from myTable"
val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over 
(partition by id " +
"order by eventTs rows between 50 preceding and current row) as cnt2 from 
myTable"
val left = tEnv.sqlQuery(sql1)
val right = tEnv.sqlQuery(sql2)
left.join(right).where("id = r_id && eventTs === r_eventTs").select('id)
.writeToSink(new TestRetractSink, queryConfig)

env.execute()
}

{code}

  was:Currently, {{RelTimeIndicatorConverter}} do not materialize time 
indicators in conditions of LogicalFilter which leads to type miss exceptions.


> Materialize time indicators in conditions of LogicalFilter
> ----------------------------------------------------------
>
>                 Key: FLINK-8898
>                 URL: https://issues.apache.org/jira/browse/FLINK-8898
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Hequn Cheng
>            Priority: Major
>
> Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators 
> in conditions of LogicalFilter which leads to type miss exceptions. We can 
> reproduce the exception by the following test case.
> {code:java}
> @Test
> def reproduceTypeMissmatch(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setStateBackend(getStateBackend)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> StreamITCase.clear
> val data1 = new mutable.MutableList[(Int, Long, Int, Long)]
> data1.+=((1, 1L, 1, 1L))
> data1.+=((1, 2L, 1, 1L))
> val t1 = env.fromCollection(data1)
> .assignTimestampsAndWatermarks(new Row5WatermarkExtractor)
> .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime)
> tEnv.registerTable("myTable", t1)
> val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition 
> by id order by eventTs rows" +
> " between 100 preceding and current row) as cnt1 from myTable"
> val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over 
> (partition by id " +
> "order by eventTs rows between 50 preceding and current row) as cnt2 from 
> myTable"
> val left = tEnv.sqlQuery(sql1)
> val right = tEnv.sqlQuery(sql2)
> left.join(right).where("id = r_id && eventTs === r_eventTs").select('id)
> .writeToSink(new TestRetractSink, queryConfig)
> env.execute()
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to