Caveat: it has been a long time and I don't really know the details of the
FlinkRunner. But I can answer a couple questions.

On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev <dev@beam.apache.org>
wrote:

> Hi Team,
>
> I am facing an issue of running a beam stateful job on flink,
>
> *Problem Statement:*
>     Stateful beam application with TUMBLE window running on Flink Runner
> which has consistent checkpoint size increasing over time.
>
> *Observation:*
>    The memory usage keeps increasing over time and getting OOM kill (code
> 137) on kubernetes pods.
>
> *Version:*
>     Beam version 2.32, Flink version 1.13.6, State backend -
> EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)
>
> *Assumption:*
>    State is never cleared on statebackend even when the window is closed.
>
>
> *Questions:*
>   1. What is the significance of currentSideInputWatermark in
> *org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator*
> and how does it affect application without side input?
>
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
>

If you have a main input and a side input, each main input window is
buffered until the side input is "ready" for that window to be processed.
That particularly line is about flushing all the rest of the data when the
side input is fully ready and you are guaranteed to never see more data on
the side input.

The rest of the questions I don't know when the under-the-hood stuff is
cleared out.

Kenn


>     On removing the check *if (currentSideInputWatermark >=
> BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling 
> *emitAllPushedBackData();
> for every processwatermark reduces the checkpoint size, which otherwise
> keeps increasing
>
>   2.  When is the state information cleared on the WindowDoFn (TUMBLE
> windows)  on window closure ? When will global states and timers get
> cleared?
>
>   3.  Is timer and keystate information clearance by the following enough
> to not have ever increasing memory or checkpoints?
>
>
>            *Flush on watermark:*
>
>
> pushedBackElementsHandler.clear();
>
>
> *Timer removal:*
>
>
> keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());
>
>
> *Global removal:*
>
>
> keyedStateInternals.clearGlobalState();
>
>
>
>
>
>   -Hemant
>
>
>
>
>

Reply via email to