[ 
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052154#comment-16052154
 ] 

Fabian Hueske edited comment on FLINK-6886 at 6/16/17 5:29 PM:
---------------------------------------------------------------

You are right. The problem is in {{translate(dataStreamPlan, 
relNode.getRowType, queryConfig, withChangeFlag)}}.

We use the RowType of the original input plan because the field names might 
change (Calcite prunes pure renaming projections as noops). However, the 
{{RelTimeIndicatorConverter}} (correctly!) changes the types of time 
indicators. So, types of the optimized plan are not identical to the original 
plan. This difference causes the exception.

A simple solution would be to just merge the field names of the original plan 
with the field types of the optimized plan and construct a new {{RelDataType}}. 
I change the {{StreamTableEnvironment.translate()}} method to this:

{code}
protected def translate[A](
      table: Table,
      queryConfig: StreamQueryConfig,
      updatesAsRetraction: Boolean,
      withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] 
= {
    val relNode = table.getRelNode
    val dataStreamPlan = optimize(relNode, updatesAsRetraction)
// zip original field names with optimized field types
    val x = 
relNode.getRowType.getFieldList.asScala.zip(dataStreamPlan.getRowType.getFieldList.asScala)
      // get name of original plan and type of optimized plan
      .map(x => (x._1.getName, x._2.getType))
      // add index
      .zipWithIndex
      // build new field types
      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
    // build a record type from list of field types
    val rowType = new 
RelRecordType(x.toList.asJava.asInstanceOf[_root_.java.util.List[RelDataTypeField]])

    translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
  }
{code}

and got it (and all tests) working.

The field merging can be done a lot nicer.


was (Author: fhueske):
You are right. The problem is in  {{translate(dataStreamPlan, 
relNode.getRowType, queryConfig, withChangeFlag)}}

> Fix Timestamp field can not be selected in event time case when  
> toDataStream[T], `T` not a `Row` Type.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6886
>                 URL: https://issues.apache.org/jira/browse/FLINK-6886
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type 
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such 
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For 
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as 
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY 
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and 
> logical row types do not match.This is a bug and should not happen. Please 
> file an issue.
>       at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>       at 
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be 
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} 
> method bug. So in this JIRA. will fix it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to