[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395396#comment-15395396 ]
ASF GitHub Bot commented on FLINK-3674: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2301#discussion_r72416703 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java --- @@ -55,13 +56,18 @@ /** the user function */ protected final F userFunction; + + protected EventTimeFunction eventTimeFunction; /** Flag to prevent duplicate function.close() calls in close() and dispose() */ private transient boolean functionsClosed = false; public AbstractUdfStreamOperator(F userFunction) { this.userFunction = requireNonNull(userFunction); + if(userFunction instanceof EventTimeFunction) { --- End diff -- missing space after if > Add an interface for EventTime aware User Function > -------------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: ramkrishna.s.vasudevan > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)