[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050535#comment-16050535 ]
Fabian Hueske commented on FLINK-6886: -------------------------------------- Maybe there's another way to fix this problem. I played a bit around and found the following: The following Table API query is executed correctly: {code} val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime) val windowedTable = table .window(Tumble over 2.seconds on 'proctime as 'w) .groupBy('w, 'n) .select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e) val results = windowedTable.toAppendStream[MP](queryConfig) // POJO class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) { def this() { this(null, null, 0, null) } override def toString: String = s"$n,${s.toString},${e.toString},$cnt" } {code} whereas the equivalent SQL query fails with the reported exception ("The field types of physical and logical row types do not match") {code} val sqlTable = tEnv.sql( s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s, | TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e, | n, | COUNT(i) as cnt |FROM $table |GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND) | """.stripMargin) val results = sqlTable.toAppendStream[MP](queryConfig) {code} The plans of both queries look similar, but the SQL plan seems to lack the correct final projection: {code} // Table API plan == Abstract Syntax Tree == LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')]) LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)]) LogicalProject(n=[$2], i=[$1], proctime=[$3]) LogicalTableScan(table=[[_DataStreamTable_0]]) == Optimized Logical Plan == DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e]) DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w, 'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1, end('w) AS TMP_2]) DataStreamCalc(select=[n, i, proctime]) DataStreamScan(table=[[_DataStreamTable_0]]) // SQL plans == Abstract Syntax Tree == LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2]) LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1]) LogicalTableScan(table=[[UnnamedTable$3]]) == Optimized Logical Plan == DataStreamCalc(select=[w$start, w$end, n, cnt]) DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$, 'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start, end('w$) AS w$end]) DataStreamCalc(select=[n, proctime, i]) DataStreamScan(table=[[_DataStreamTable_0]]) {code} So this doesn't seem to be a principled issue with the time attributes or window properties but rather an issue of the SQL optimization. What do you think [~sunjincheng121] and [~jark]? > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: sunjincheng > Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)