[ 
https://issues.apache.org/jira/browse/FLINK-4239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391914#comment-15391914
 ] 

ASF GitHub Bot commented on FLINK-4239:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2278#discussion_r72065896
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
 ---
    @@ -44,39 +44,36 @@ private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
        @Override
        public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
                TriggerResult triggerResult = nestedTrigger.onElement(element, 
timestamp, window, ctx);
    -           switch (triggerResult) {
    -                   case FIRE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   case FIRE_AND_PURGE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   default:
    -                           return TriggerResult.CONTINUE;
    +           if (triggerResult.isFire()) {
    +                   return TriggerResult.FIRE_AND_PURGE;
    +           } else if (triggerResult.isPurge()) {
    +                   return TriggerResult.PURGE;
    +           } else {
    +                   return TriggerResult.CONTINUE;
                }
        }
     
        @Override
        public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) throws Exception {
                TriggerResult triggerResult = nestedTrigger.onEventTime(time, 
window, ctx);
    -           switch (triggerResult) {
    -                   case FIRE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   case FIRE_AND_PURGE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   default:
    -                           return TriggerResult.CONTINUE;
    +           if (triggerResult.isFire()) {
    +                   return TriggerResult.FIRE_AND_PURGE;
    +           } else if (triggerResult.isPurge()) {
    +                   return TriggerResult.PURGE;
    +           } else {
    +                   return TriggerResult.CONTINUE;
                }
        }
     
        @Override
        public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception {
                TriggerResult triggerResult = 
nestedTrigger.onProcessingTime(time, window, ctx);
    -           switch (triggerResult) {
    -                   case FIRE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   case FIRE_AND_PURGE:
    -                           return TriggerResult.FIRE_AND_PURGE;
    -                   default:
    -                           return TriggerResult.CONTINUE;
    +           if (triggerResult.isFire()) {
    +                   return TriggerResult.FIRE_AND_PURGE;
    +           } else if (triggerResult.isPurge()) {
    +                   return TriggerResult.PURGE;
    +           } else {
    +                   return TriggerResult.CONTINUE;
                }
        }
     
    --- End diff --
    
    Same as above.


> Set Default Allowed Lateness to Zero and Make Triggers Non-Purging
> ------------------------------------------------------------------
>
>                 Key: FLINK-4239
>                 URL: https://issues.apache.org/jira/browse/FLINK-4239
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> In preparation for upcoming changes in 1.2 we should set the default allowed 
> lateness to zero and make all built-in triggers non-purging by default. 
> Currently, {{EventTimeTrigger}} and {{ProcessingTimeTrigger}} purge when they 
> fire. This leads to unexpected behavior when a user sets some meaningful 
> allowed lateness. The behavior will be that the window is purged when firing 
> and the state will not actually be kept within the allowed lateness.
> Changing the behavior to non-purging requires changing the default allowed 
> lateness from {{Long.MAX_VALUE}} to {{0}}. Otherwise we would have memory 
> leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to