[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117031#comment-16117031
 ] 

ASF GitHub Bot commented on FLINK-7337:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131705390
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -677,12 +748,43 @@ abstract class StreamTableEnvironment(
     
         val rootParallelism = plan.getParallelism
     
    -    conversion match {
    -      case mapFunction: MapFunction[CRow, A] =>
    -        plan.map(mapFunction)
    -          .returns(tpe)
    -          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    -          .setParallelism(rootParallelism)
    +    val rowtimeFields = logicalType.getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    if (rowtimeFields.isEmpty) {
    +      // to rowtime field to set
    +
    +      conversion match {
    +        case mapFunction: MapFunction[CRow, A] =>
    +          plan.map(mapFunction)
    +            .returns(tpe)
    +            .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    +            .setParallelism(rootParallelism)
    +      }
    +    } else if (rowtimeFields.size == 1) {
    +      // set the only rowtime field as event-time timestamp for DataStream
    +
    +      val mapFunction = conversion match {
    +        case mapFunction: MapFunction[CRow, A] => mapFunction
    +        case _ => new MapFunction[CRow, A] {
    +          override def map(cRow: CRow): A = cRow.asInstanceOf[A]
    +        }
    +      }
    +
    +      plan.process(
    +        new WrappingTimestampSetterProcessFunction[A](
    +          mapFunction,
    +          rowtimeFields.head.getIndex))
    +        .returns(tpe)
    +        .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    +        .setParallelism(rootParallelism)
    +
    +    } else {
    +      throw new TableException(
    +        s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
    +          s"the table that should be converted to a DataStream.\n" +
    +          s"Please select the rowtime field that should be used as 
event-time timestamp for the " +
    +          s"DataStream by casting all other fields to TIMESTAMP or LONG.")
    --- End diff --
    
    I would just recommend TIMESTAMP. LONG is still not supported.


> Refactor handling of time indicator attributes
> ----------------------------------------------
>
>                 Key: FLINK-7337
>                 URL: https://issues.apache.org/jira/browse/FLINK-7337
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to