[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634612#comment-15634612
 ] 

ASF GitHub Bot commented on FLINK-4174:
---------------------------------------

Github user soniclavier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r86462637
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
    @@ -273,32 +284,93 @@ public void onProcessingTime(InternalTimer<K, W> 
timer) throws Exception {
                        return;
                }
     
    -           TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
    -           if (triggerResult.isFire()) {
    -                   fire(context.window, contents);
    -           }
    +                           TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
    +                           if (triggerResult.isFire()) {
    +                                   fire(context.window, contents, 
windowState);
    +                           }
     
                if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
                        cleanup(context.window, windowState, mergingWindows);
                }
        }
     
    -   private void fire(W window, Iterable<StreamRecord<IN>> contents) throws 
Exception {
    +   private void fire(W window, Iterable<StreamRecord<IN>> contents, 
ListState<StreamRecord<IN>> windowState) throws Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
     
                // Work around type system restrictions...
    -           int toEvict = evictor.evict((Iterable) contents, 
Iterables.size(contents), context.window);
    -
    -           FluentIterable<IN> projectedContents = FluentIterable
    +           FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = 
FluentIterable
                        .from(contents)
    -                   .skip(toEvict)
    -                   .transform(new Function<StreamRecord<IN>, IN>() {
    +                   .transform(new Function<StreamRecord<IN>, 
TimestampedValue<IN>>() {
    +                           @Override
    +                           public TimestampedValue<IN> 
apply(StreamRecord<IN> input) {
    +                                   return new 
TimestampedValue<>(input.getValue(), input.getTimestamp());
    +                           }
    +                   });
    +           evictorContext.evictBefore(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
    +
    +           FluentIterable<IN> projectedContents = recordsWithTimestamp
    +                   .transform(new Function<TimestampedValue<IN>, IN>() {
                                @Override
    -                           public IN apply(StreamRecord<IN> input) {
    +                           public IN apply(TimestampedValue<IN> input) {
                                        return input.getValue();
                                }
                        });
    +
                userFunction.apply(context.key, context.window, 
projectedContents, timestampedCollector);
    +           evictorContext.evictAfter(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
    +
    +
    +           //work around to fix FLINK-4369, remove the evicted elements 
from the windowState.
    +           //this is inefficient, but there is no other way to remove 
elements from ListState, which is an AppendingState.
    +           windowState.clear();
    +           for(TimestampedValue<IN> record : recordsWithTimestamp) {
    +                   if (record.getTimestamp() < 0) {
    --- End diff --
    
    Regarding the copy method: Are you asking me to add a copy method in the 
TimestampedValue that will return a corresponding StreamRecord, something like 
this:
    
    ```java
    /**
     * Creates a {@link StreamRecord} from this TimestampedValue.
     */
    public StreamRecord<T> getStreamRecord() {
        StreamRecord<T> streamRecord = new StreamRecord<>(value);
        if (hasTimestamp) {
                streamRecord.setTimestamp(timestamp);
        }
        return streamRecord;
    }
    ```


> Enhance Window Evictor
> ----------------------
>
>                 Key: FLINK-4174
>                 URL: https://issues.apache.org/jira/browse/FLINK-4174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: vishnu viswanath
>            Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to