Hi,

I have the following operator:

mainStream
      .coGroup(coStream)
      .where(_.uuid).equalTo(_.uuid)
      .window(GlobalWindows.create())
      .trigger(triggerWhenAllReceived)
      .apply(mergeElements)

TLDR; It seems that the checkpointed state of the operator keeps growing
forever even if I clear the state and purge the buffered elements using a
processing time trigger.

Details:

Basically I have a main stream that gets elements from another stream and
when it has received all the elements that have been waiting for it outputs
a new element that has been created using the information of all the
received elements.

To do so I use a GlobalWindow and a custom trigger. The custom trigger has
as state two counters, the elements that it has to receive (extracted from
the element received from the main stream) and the elements that it has
received so far from the other stream. When the two counters have the same
value I use the FIRE_AND_PURGE trigger to output all the elements in the
pane (I understand that each set of elements is stored in a pane defined by
the global window and the UUID key). 

To cleanup the state (and to not keep elements waiting forever) I setup a
processing time timer which basically clears the state and outputs
FIRE_AND_PURGE to remove the buffered elements.

I must be missing something because the checkpointed state keeps growing
forever so I suspect that the pane is not completely removed.

Gerard




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to