[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117045#comment-16117045 ]
ASF GitHub Bot commented on FLINK-7337: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r131724882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala --- @@ -18,42 +18,54 @@ package org.apache.flink.table.runtime -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory /** - * MapRunner with [[CRow]] output. + * ProcessRunner with [[CRow]] output. */ -class CRowOutputMapRunner( +class CRowOutputProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] + extends ProcessFunction[Any, CRow] with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { + with Compiler[ProcessFunction[Any, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") function = clazz.newInstance() - outCRow = new CRow(null, true) + + this.cRowWrapper = new CRowWrappingCollector() + this.cRowWrapper.setChange(true) } - override def map(in: Any): CRow = { - outCRow.row = function.map(in) - outCRow + override def processElement( + in: Any, + ctx: ProcessFunction[Any, CRow]#Context, + out: Collector[CRow]): Unit = { + + // remove timestamp from stream record + val tc = out.asInstanceOf[TimestampedCollector[_]] --- End diff -- Do we need this change? It is not executed if `org.apache.flink.table.plan.nodes.CommonScan#needsConversion` returns false. > Refactor handling of time indicator attributes > ---------------------------------------------- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)