[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525561#comment-15525561 ]
Aljoscha Krettek commented on FLINK-3674: ----------------------------------------- I want to implement this using two new interfaces: - {{TimelyFunction}}, this would be implemented by a user function that needs to deal with timers/time - {{TimerService}}, this allows so set processing-time/event-time timers and query the current time {code} /** * Interface for user functions that allows them to deal with time. * * <p>{@link #setTimerService(TimerService)} is called by the system to provide the function * with a {@link TimerService} that can be used to query the current time and set timers. * * <p>{@link #onTimer(long, TimeDomain)} is called when timers fire. */ @PublicEvolving public interface TimelyFunction { /** * Provides a {@link TimerService} to the function. */ void setTimerService(TimerService timerService); /** * Called when a timer set using {@link TimerService} fires. */ void onTimer(long timestamp, TimeDomain timeDomain); } /** * Interface for working with time and timers. */ @PublicEvolving public interface TimerService { /** Returns the current processing time. */ long currentProcessingTime(); /** Returns the current event time. */ long currentEventTime(); /** * Registers a timer to be fired when processing time passes the given time. * * <p>Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerProcessingTimeTimer(long time); /** * Deletes the given timer in the current key/window scope. */ void deleteProcessingTimeTimer(long time); /** * Registers a timer to be fired when the event time watermark passes the given time. * * <p>Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerEventTimeTimer(long time); /** * Deletes the given timer in the current key/window scope. */ void deleteEventTimeTimer(long time); } {code} I put in the delete methods for now but they will probably go away in the final implementation because deletes cannot be done in an efficient way. > Add an interface for Time aware User Functions > ---------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)