[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117027#comment-16117027 ]
ASF GitHub Bot commented on FLINK-7337: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131704170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment( } /** + * Injects markers for time indicator fields into the field indexes. + * A rowtime indicator is represented as -1, a proctime indicator as -2. + * + * @param fieldIndexes The field indexes into which the time indicators markers are injected. + * @param rowtime An optional rowtime indicator + * @param proctime An optional proctime indicator + * @return An adjusted array of field indexes. + */ + private def adjustFieldIndexes( + fieldIndexes: Array[Int], + rowtime: Option[(Int, String)], + proctime: Option[(Int, String)]): Array[Int] = { + + // inject rowtime field + val withRowtime = if (rowtime.isDefined) { + fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime + } else { + fieldIndexes + } + + // inject proctime field + val withProctime = if (proctime.isDefined) { + withRowtime.patch(proctime.get._1, Seq(-2), 0) // -2 indicates proctime + } else { + withRowtime + } + + withProctime + } + + /** + * Injects names of time indicator fields into the list of field names. + * + * @param fieldNames The array of field names into which the time indicator field names are + * injected. + * @param rowtime An optional rowtime indicator + * @param proctime An optional proctime indicator + * @return An adjusted array of field names. + */ + private def adjustFieldNames( + fieldNames: Array[String], + rowtime: Option[(Int, String)], + proctime: Option[(Int, String)]): Array[String] = { + + // inject rowtime field + val withRowtime = if (rowtime.isDefined) { + fieldNames.patch(rowtime.get._1, Seq(rowtime.get._2), 0) + } else { + fieldNames + } + + // inject proctime field + val withProctime = if (proctime.isDefined) { --- End diff -- we could use pattern matching here > Refactor handling of time indicator attributes > ---------------------------------------------- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)