[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633419#comment-15633419 ]
ASF GitHub Bot commented on FLINK-4174: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2736#discussion_r86388455 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java --- @@ -58,6 +61,458 @@ // For counting if close() is called the correct number of times on the SumReducer + /** + * Tests CountEvictor evictAfter behavior + * @throws Exception + */ + @Test + public void testCountEvictorEvictAfter() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + final int WINDOW_SIZE = 4; + final int TRIGGER_COUNT = 2; + final boolean EVICT_AFTER = true; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer = + (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)), + CountTrigger.of(TRIGGER_COUNT), + CountEvictor.of(WINDOW_SIZE,EVICT_AFTER), + 0); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); --- End diff -- As it is now, it doesn't seem this ever evicts elements because eviction happens after triggering. If you add these lines that would exercise the eviction: ``` testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); ``` It checks whether the previous firing with 6 elements truncated back to 4. > Enhance Window Evictor > ---------------------- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming > Reporter: vishnu viswanath > Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)