Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172135104 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -70,21 +69,15 @@ public void open() throws Exception { @Override public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - onTimerContext.timeDomain = TimeDomain.EVENT_TIME; - onTimerContext.timer = timer; - userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); - onTimerContext.timeDomain = null; - onTimerContext.timer = null; + reinitialize(userFunction, TimeDomain.EVENT_TIME, timer); --- End diff -- Hate to be picky, but I think the name is a bit misleading and we could probably put all of this in a method `invokeUserTime()` that does what `reinitialise()` and `reset()` do. @kl0u I think you can quickly fix that when merging.
---