[ 
https://issues.apache.org/jira/browse/FLINK-7731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16200208#comment-16200208
 ] 

Gerard Garcia commented on FLINK-7731:
--------------------------------------

I tried that but it still doesn't work. I have been doing some debugging and it 
seems to me that the timer never gets triggered. It may be that what is 
happening is that the {{onProcessingTime}} callback is only triggered when 
there are contents in the window state (all code snippets are from version 
1.3.2):

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#onProcessingTime()
{code:java}
                ACC contents = null;
                if (windowState != null) {
                        contents = windowState.get();
                }

                if (contents != null) {
                        TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
                        if (triggerResult.isFire()) {
                                emitWindowContents(triggerContext.window, 
contents);
                        }
                        if (triggerResult.isPurge()) {
                                windowState.clear();
                        }
                }
{code}

and that state is being cleared when the trigger result is type purge:

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement()
{code:java}
                                TriggerResult triggerResult = 
triggerContext.onElement(element);

                                if (triggerResult.isFire()) {
                                        ACC contents = windowState.get();
                                        if (contents == null) {
                                                continue;
                                        }
                                        emitWindowContents(window, contents);
                                }

                                if (triggerResult.isPurge()) {
                                        windowState.clear();
                                }
{code}


> Trigger on GlobalWindow does not clean state completely
> -------------------------------------------------------
>
>                 Key: FLINK-7731
>                 URL: https://issues.apache.org/jira/browse/FLINK-7731
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataStream API
>    Affects Versions: 1.3.2
>            Reporter: Gerard Garcia
>            Priority: Minor
>
> I have an operator that consists of:
> CoGroup Datastream -> GlobalWindow -> CustomTrigger -> Apply function
> The custom trigger fires and purges the elements after it has received the 
> expected number of elements (or when a timeout fires) from one of the streams 
> and the apply function merges the elements with the ones received from the 
> other stream. It appears that the state of the operator grows continuously so 
> it seems it never gets completely cleaned.
> There is a discussion in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clean-GlobalWidnow-state-td15613.html
>  that suggests that it may be a bug.
> This job reproduces the issue: 
> https://github.com/GerardGarcia/flink-global-window-growing-state



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to