[ https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986432#comment-15986432 ]
ASF GitHub Bot commented on FLINK-4953: --------------------------------------- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 Thanks, @manuzhang I had another look at your changes. I would merge them now but simplify the tests and `WindowOperator` a little, if that's alright with you. In `WindowOperator` I would change `WindowContext` to query the `InternalTimerService` directly, as in: ``` @Override public long currentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public long currentWatermark() { return internalTimerService.currentWatermark(); } ``` I would introduce a specific test, like this: ``` @Test public void testEventTimeQuerying() throws Exception { testCurrentTimeQuerying(new EventTimeAdaptor()); } @Test public void testProcessingTimeQuerying() throws Exception { testCurrentTimeQuerying(new ProcessingTimeAdaptor()); } public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); shouldFireOnElement(mockTrigger); when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) .thenReturn(Arrays.asList(new TimeWindow(0, 20))); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext()); timeAdaptor.advanceTime(testHarness, 10); testHarness.processElement(new StreamRecord<>(0, 0L)); verify(mockWindowFunction, times(1)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); timeAdaptor.advanceTime(testHarness, 100); verify(mockWindowFunction, times(1)).clear(anyTimeWindow(), anyInternalWindowContext()); } ``` What do you think? I would change your commit and commit as one thing. > Allow access to "time" in ProcessWindowFunction.Context > ------------------------------------------------------- > > Key: FLINK-4953 > URL: https://issues.apache.org/jira/browse/FLINK-4953 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Manu Zhang > Assignee: Manu Zhang > > The recently added {{ProcessWindowFunction}} has a {{Context}} object that > allows querying some additional information about the window firing that we > are processing. Right now, this is only the window for which the firing is > happening. We should extends this with methods that allow querying the > current processing time and the current watermark. > Original text by issue creator: This is similar to FLINK-3674 but exposing > time information in window functions. Currently when a timer is fired, all > states in a window will be returned to users, including those after the > timer. This change will allow users to filter out states after the timer > based on time info. -- This message was sent by Atlassian JIRA (v6.3.15#6346)