[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633418#comment-15633418
 ] 

ASF GitHub Bot commented on FLINK-4174:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r86385660
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
 ---
    @@ -35,23 +37,63 @@
        private static final long serialVersionUID = 1L;
     
        private final long windowSize;
    +   private final boolean doEvictAfter;
     
        public TimeEvictor(long windowSize) {
                this.windowSize = windowSize;
    +           this.doEvictAfter = false;
    +   }
    +
    +   public TimeEvictor(long windowSize, boolean doEvictAfter) {
    +           this.windowSize = windowSize;
    +           this.doEvictAfter = doEvictAfter;
        }
     
    +
        @Override
    -   public int evict(Iterable<StreamRecord<Object>> elements, int size, W 
window) {
    -           int toEvict = 0;
    -           long currentTime = Iterables.getLast(elements).getTimestamp();
    +   public void evictBefore(Iterable<TimestampedValue<Object>> elements, 
int size, W window, EvictorContext ctx) {
    +           if(!doEvictAfter) {
    +                   evict(elements,size,ctx);
    +           }
    +   }
    +
    +   @Override
    +   public void evictAfter(Iterable<TimestampedValue<Object>> elements, int 
size, W window, EvictorContext ctx) {
    +           if(doEvictAfter) {
    +                   evict(elements,size,ctx);
    +           }
    +   }
    +
    +   private void evict(Iterable<TimestampedValue<Object>> elements, int 
size, EvictorContext ctx) {
    +           long currentTime = getMaxTimestamp(elements);
    +
    +           if (currentTime < 0) {
    +                   //we don't evict any element if timestamp is not set in 
the record
    +                   return;
    +           }
                long evictCutoff = currentTime - windowSize;
    -           for (StreamRecord<Object> record: elements) {
    -                   if (record.getTimestamp() > evictCutoff) {
    -                           break;
    +
    +           for (Iterator<TimestampedValue<Object>> iterator = 
elements.iterator(); iterator.hasNext(); ) {
    +                   TimestampedValue<Object> record = iterator.next();
    +                   if (record.getTimestamp() <= evictCutoff) {
    +                           iterator.remove();
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * @param elements The elements currently in the pane.
    +    * @return The maximum value of timestamp among the elements
    +     */
    +   private long getMaxTimestamp(Iterable<TimestampedValue<Object>> 
elements) {
    +           long currentTime = Long.MIN_VALUE;
    +           for (Iterator<TimestampedValue<Object>> iterator = 
elements.iterator(); iterator.hasNext();){
    +                   TimestampedValue<Object> record = iterator.next();
    +                   if (record.getTimestamp() > currentTime) {
    --- End diff --
    
    This could be more succinctly expressed as `currentTime = 
Math.max(currentTime, record.getTimestamp());`


> Enhance Window Evictor
> ----------------------
>
>                 Key: FLINK-4174
>                 URL: https://issues.apache.org/jira/browse/FLINK-4174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: vishnu viswanath
>            Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to