[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634612#comment-15634612 ]
ASF GitHub Bot commented on FLINK-4174: --------------------------------------- Github user soniclavier commented on a diff in the pull request: https://github.com/apache/flink/pull/2736#discussion_r86462637 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -273,32 +284,93 @@ public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { return; } - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents, windowState); + } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { cleanup(context.window, windowState, mergingWindows); } } - private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception { + private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... - int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); - - FluentIterable<IN> projectedContents = FluentIterable + FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable .from(contents) - .skip(toEvict) - .transform(new Function<StreamRecord<IN>, IN>() { + .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() { + @Override + public TimestampedValue<IN> apply(StreamRecord<IN> input) { + return new TimestampedValue<>(input.getValue(), input.getTimestamp()); + } + }); + evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + FluentIterable<IN> projectedContents = recordsWithTimestamp + .transform(new Function<TimestampedValue<IN>, IN>() { @Override - public IN apply(StreamRecord<IN> input) { + public IN apply(TimestampedValue<IN> input) { return input.getValue(); } }); + userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); + evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + + //work around to fix FLINK-4369, remove the evicted elements from the windowState. + //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState. + windowState.clear(); + for(TimestampedValue<IN> record : recordsWithTimestamp) { + if (record.getTimestamp() < 0) { --- End diff -- Regarding the copy method: Are you asking me to add a copy method in the TimestampedValue that will return a corresponding StreamRecord, something like this: ```java /** * Creates a {@link StreamRecord} from this TimestampedValue. */ public StreamRecord<T> getStreamRecord() { StreamRecord<T> streamRecord = new StreamRecord<>(value); if (hasTimestamp) { streamRecord.setTimestamp(timestamp); } return streamRecord; } ``` > Enhance Window Evictor > ---------------------- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming > Reporter: vishnu viswanath > Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)