Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) + val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + + // convert the input type for the conversion mapper + // the input will be changed in the OutputRowtimeProcessFunction later + val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I think we should avoid implicit defaults like using the timestamp attribute of the left most table (the left most table might not have a time indicator attribute, join order optimization would change the order of tables) and special cases for queries like `SELECT *`. When a `Table` is converted into a `DataStream` it is likely that the resulting stream is further processed by logic that cannot be expressed in SQL / Table API. If a `Table` has multiple timestamp attributes, IMO a user should be forced to make a choice for the `StreamRecord` timestamp, because the semantics of any subsequent time-based operations will depend on that. I see two ways to do that: - ensure that only one attribute is a time indicator by casting the others to `TIMESTAMP` - let the user specify which field should be used as timestamp as an additional parameter of the `toAppendStream` and `toRetractStream` methods. We could also do both. I agree with @wuchong that we do not need this restriction when we emit a `Table` to a `TableSink`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---