Hey Flink Community,

I'm working on a Flink application where we are implementing operators that
extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are
working directly with Flink's state API (ValueState, ListState, MapState).
Something that appears to be extremely valuable is having a way to monitor
the state size for each operator. My team has already run into a few cases
where our state has exploded and jobs fail because YARN kills containers
who are exceeding their memory limits.

It is my understanding that the way to best monitor this kind of thing by
watching checkpoint size per operator instance. This gets a little
confusing when doing incremental check-pointing because the numbers
reported seem to be a delta in state size, not the actual state size at
that point in time. For my teams application, the total state size is not
the sum of those deltas. What is the best way to get the total size of a
checkpoint per operator for each checkpoint?

Additionally, monitoring de-serializing and serializing state in a Flink
application is something that I haven't seen a great story for yet. It
seems that some of the really badly written Flink operators tend to do most
poorly when they demand lots of serde for each record. So giving visibility
into how well an application is handling these types of operations seems to
be a valuable guard rail for flink developers. Does anyone have existing
solutions for this, or are there pointers to some work that can be done to
improve this story?

Aaron

Reply via email to