[ 
https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568955#comment-16568955
 ] 

Elias Levy commented on FLINK-9600:
-----------------------------------

[~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape 
hatch when you can't perform what you want within the higher level DSL.  The 
improvement I am suggestion is within the higher level DSL.

E.g.it is a lot nicer to write:
{code:java}
dataStream.filter( (x, ts) => { isDayTime(ts) } )
{code}
than
{code:java}
class ProcessFilter extends ProcessFunction[T,T] {
  override def processElement(value: T, ctx: Context, out: Collector[T]): Unit 
={
    if (isDayTime(ctx.timestamp))
      out.collect(value) }
  }
} 
dataStream.process(new ProcessFilter())
{code}
 

> Add DataStream transformation variants that pass timestamp to the user 
> function
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-9600
>                 URL: https://issues.apache.org/jira/browse/FLINK-9600
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Elias Levy
>            Priority: Minor
>
> It is often necessary to access the timestamp assigned to records within user 
> functions.  At the moment this is only possible from {{RichFunction}}. 
> Implementing a {{RichFunction}} just to access the timestamp is burdensome, 
> so most job carry a duplicate of the timestamp within the record.
> It would be useful if {{DataStream}} provided transformation methods that 
> accepted user functions that could be passed the record's timestamp as an 
> additional argument, similar to how there are two variants of {{flatMap}}, 
> one with an extra parameter that gives the user function access to the output 
> {{Collector}}.
> Along similar lines, it may be useful to have variants that pass the record's 
> key as an additional parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to