[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633419#comment-15633419
 ] 

ASF GitHub Bot commented on FLINK-4174:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r86388455
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 ---
    @@ -58,6 +61,458 @@
     
        // For counting if close() is called the correct number of times on the 
SumReducer
     
    +   /**
    +    * Tests CountEvictor evictAfter behavior
    +    * @throws Exception
    +     */
    +   @Test
    +   public void testCountEvictorEvictAfter() throws Exception {
    +           AtomicInteger closeCalled = new AtomicInteger(0);
    +           final int WINDOW_SIZE = 4;
    +           final int TRIGGER_COUNT = 2;
    +           final boolean EVICT_AFTER = true;
    +
    +           TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
    +
    +           @SuppressWarnings({"unchecked", "rawtypes"})
    +           TypeSerializer<StreamRecord<Tuple2<String, Integer>>> 
streamRecordSerializer =
    +                   (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) 
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
    +
    +           ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc =
    +                   new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
    +
    +
    +           EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
    +                   GlobalWindows.create(),
    +                   new GlobalWindow.Serializer(),
    +                   new TupleKeySelector(),
    +                   BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
    +                   stateDesc,
    +                   new InternalIterableWindowFunction<>(new 
RichSumReducer<GlobalWindow>(closeCalled)),
    +                   CountTrigger.of(TRIGGER_COUNT),
    +                   CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
    +                   0);
    +
    +
    +           OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
    +                   new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
    +
    +
    +           long initialTime = 0L;
    +           ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
    +
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
    +
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1998));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
    +
    +
    +
    +           expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
    +           expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
    +           expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
    +
    +           TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
    +
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
    +           testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
    +
    +           expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
    +           expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), 
Long.MAX_VALUE));
    +
    +           TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
    --- End diff --
    
    As it is now, it doesn't seem this ever evicts elements because eviction 
happens after triggering. If you add these lines that would exercise the 
eviction:
    
    ```
    testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 
initialTime + 1000));
    testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 
initialTime + 1000));
    expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), 
Long.MAX_VALUE));
    
    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
    ```
    
    It checks whether the previous firing with 6 elements truncated back to 4.


> Enhance Window Evictor
> ----------------------
>
>                 Key: FLINK-4174
>                 URL: https://issues.apache.org/jira/browse/FLINK-4174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: vishnu viswanath
>            Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to