[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052154#comment-16052154 ]
Fabian Hueske edited comment on FLINK-6886 at 6/16/17 5:29 PM: --------------------------------------------------------------- You are right. The problem is in {{translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)}}. We use the RowType of the original input plan because the field names might change (Calcite prunes pure renaming projections as noops). However, the {{RelTimeIndicatorConverter}} (correctly!) changes the types of time indicators. So, types of the optimized plan are not identical to the original plan. This difference causes the exception. A simple solution would be to just merge the field names of the original plan with the field types of the optimized plan and construct a new {{RelDataType}}. I change the {{StreamTableEnvironment.translate()}} method to this: {code} protected def translate[A]( table: Table, queryConfig: StreamQueryConfig, updatesAsRetraction: Boolean, withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = { val relNode = table.getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) // zip original field names with optimized field types val x = relNode.getRowType.getFieldList.asScala.zip(dataStreamPlan.getRowType.getFieldList.asScala) // get name of original plan and type of optimized plan .map(x => (x._1.getName, x._2.getType)) // add index .zipWithIndex // build new field types .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2)) // build a record type from list of field types val rowType = new RelRecordType(x.toList.asJava.asInstanceOf[_root_.java.util.List[RelDataTypeField]]) translate(dataStreamPlan, rowType, queryConfig, withChangeFlag) } {code} and got it (and all tests) working. The field merging can be done a lot nicer. was (Author: fhueske): You are right. The problem is in {{translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)}} > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: sunjincheng > Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)