Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132854238
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    I think we should avoid implicit defaults like using the timestamp 
attribute of the left most table (the left most table might not have a time 
indicator attribute, join order optimization would change the order of tables) 
and special cases for queries like `SELECT *`. 
    
    When a `Table` is converted into a `DataStream` it is likely that the 
resulting stream is further processed by logic that cannot be expressed in SQL 
/ Table API. If a `Table` has multiple timestamp attributes, IMO a user should 
be forced to make a choice for the `StreamRecord` timestamp, because the 
semantics of any subsequent time-based operations will depend on that. I see 
two ways to do that:
    - ensure that only one attribute is a time indicator by casting the others 
to `TIMESTAMP`
    - let the user specify which field should be used as timestamp as an 
additional parameter of the `toAppendStream` and `toRetractStream` methods.
    
    We could also do both.
    
    I agree with @wuchong that we do not need this restriction when we emit a 
`Table` to a `TableSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to