[ https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568955#comment-16568955 ]
Elias Levy commented on FLINK-9600: ----------------------------------- [~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape hatch when you can't perform what you want within the higher level DSL. The improvement I am suggestion is within the higher level DSL. E.g.it is a lot nicer to write: {code:java} dataStream.filter( (x, ts) => { isDayTime(ts) } ) {code} than {code:java} class ProcessFilter extends ProcessFunction[T,T] { override def processElement(value: T, ctx: Context, out: Collector[T]): Unit ={ if (isDayTime(ctx.timestamp)) out.collect(value) } } } dataStream.process(new ProcessFilter()) {code} > Add DataStream transformation variants that pass timestamp to the user > function > ------------------------------------------------------------------------------- > > Key: FLINK-9600 > URL: https://issues.apache.org/jira/browse/FLINK-9600 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.5.0 > Reporter: Elias Levy > Priority: Minor > > It is often necessary to access the timestamp assigned to records within user > functions. At the moment this is only possible from {{RichFunction}}. > Implementing a {{RichFunction}} just to access the timestamp is burdensome, > so most job carry a duplicate of the timestamp within the record. > It would be useful if {{DataStream}} provided transformation methods that > accepted user functions that could be passed the record's timestamp as an > additional argument, similar to how there are two variants of {{flatMap}}, > one with an extra parameter that gives the user function access to the output > {{Collector}}. > Along similar lines, it may be useful to have variants that pass the record's > key as an additional parameter. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)