[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633418#comment-15633418 ]
ASF GitHub Bot commented on FLINK-4174: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2736#discussion_r86385660 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java --- @@ -35,23 +37,63 @@ private static final long serialVersionUID = 1L; private final long windowSize; + private final boolean doEvictAfter; public TimeEvictor(long windowSize) { this.windowSize = windowSize; + this.doEvictAfter = false; + } + + public TimeEvictor(long windowSize, boolean doEvictAfter) { + this.windowSize = windowSize; + this.doEvictAfter = doEvictAfter; } + @Override - public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { - int toEvict = 0; - long currentTime = Iterables.getLast(elements).getTimestamp(); + public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { + if(!doEvictAfter) { + evict(elements,size,ctx); + } + } + + @Override + public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { + if(doEvictAfter) { + evict(elements,size,ctx); + } + } + + private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { + long currentTime = getMaxTimestamp(elements); + + if (currentTime < 0) { + //we don't evict any element if timestamp is not set in the record + return; + } long evictCutoff = currentTime - windowSize; - for (StreamRecord<Object> record: elements) { - if (record.getTimestamp() > evictCutoff) { - break; + + for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { + TimestampedValue<Object> record = iterator.next(); + if (record.getTimestamp() <= evictCutoff) { + iterator.remove(); + } + } + } + + /** + * @param elements The elements currently in the pane. + * @return The maximum value of timestamp among the elements + */ + private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) { + long currentTime = Long.MIN_VALUE; + for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ + TimestampedValue<Object> record = iterator.next(); + if (record.getTimestamp() > currentTime) { --- End diff -- This could be more succinctly expressed as `currentTime = Math.max(currentTime, record.getTimestamp());` > Enhance Window Evictor > ---------------------- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming > Reporter: vishnu viswanath > Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)