[ 
https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986432#comment-15986432
 ] 

ASF GitHub Bot commented on FLINK-4953:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Thanks, @manuzhang I had another look at your changes. I would merge them 
now but simplify the tests and `WindowOperator` a little, if that's alright 
with you.
    
    In `WindowOperator` I would change `WindowContext` to query the 
`InternalTimerService` directly, as in:
    ```
    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }
    
    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }
    ```
    I would introduce a specific test, like this:
    ```
        @Test
        public void testEventTimeQuerying() throws Exception {
                testCurrentTimeQuerying(new EventTimeAdaptor());
        }
    
        @Test
        public void testProcessingTimeQuerying() throws Exception {
                testCurrentTimeQuerying(new ProcessingTimeAdaptor());
        }
    
        public void testCurrentTimeQuerying(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
    
                final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Void> testHarness =
                                createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
    
                testHarness.open();
    
                shouldFireOnElement(mockTrigger);
    
                when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
                                .thenReturn(Arrays.asList(new TimeWindow(0, 
20)));
    
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                                InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
                                timeAdaptor.verifyCorrectTime(testHarness, 
context);
                                return null;
                        }
                }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
    
                doAnswer(new Answer<Object>() {
                        @Override
                        public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                                InternalWindowFunction.InternalWindowContext 
context = 
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
                                timeAdaptor.verifyCorrectTime(testHarness, 
context);
                                return null;
                        }
                }).when(mockWindowFunction).clear(anyTimeWindow(), 
anyInternalWindowContext());
    
                timeAdaptor.advanceTime(testHarness, 10);
    
                testHarness.processElement(new StreamRecord<>(0, 0L));
    
                verify(mockWindowFunction, times(1)).process(anyInt(), 
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<Void>anyCollector());
    
                timeAdaptor.advanceTime(testHarness, 100);
    
                verify(mockWindowFunction, times(1)).clear(anyTimeWindow(), 
anyInternalWindowContext());
        }
    ```
    
    What do you think? I would change your commit and commit as one thing.


> Allow access to "time" in ProcessWindowFunction.Context
> -------------------------------------------------------
>
>                 Key: FLINK-4953
>                 URL: https://issues.apache.org/jira/browse/FLINK-4953
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Manu Zhang
>            Assignee: Manu Zhang
>
> The recently added {{ProcessWindowFunction}} has a {{Context}} object that 
> allows querying some additional information about the window firing that we 
> are processing. Right now, this is only the window for which the firing is 
> happening. We should extends this with methods that allow querying the 
> current processing time and the current watermark.
> Original text by issue creator: This is similar to FLINK-3674 but exposing 
> time information in window functions. Currently when a timer is fired, all 
> states in a window will be returned to users, including those after the 
> timer. This change will allow users to filter out states after the timer 
> based on time info.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to