Hey Flink Community, I'm working on a Flink application where we are implementing operators that extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we are working directly with Flink's state API (ValueState, ListState, MapState). Something that appears to be extremely valuable is having a way to monitor the state size for each operator. My team has already run into a few cases where our state has exploded and jobs fail because YARN kills containers who are exceeding their memory limits.
It is my understanding that the way to best monitor this kind of thing by watching checkpoint size per operator instance. This gets a little confusing when doing incremental check-pointing because the numbers reported seem to be a delta in state size, not the actual state size at that point in time. For my teams application, the total state size is not the sum of those deltas. What is the best way to get the total size of a checkpoint per operator for each checkpoint? Additionally, monitoring de-serializing and serializing state in a Flink application is something that I haven't seen a great story for yet. It seems that some of the really badly written Flink operators tend to do most poorly when they demand lots of serde for each record. So giving visibility into how well an application is handling these types of operations seems to be a valuable guard rail for flink developers. Does anyone have existing solutions for this, or are there pointers to some work that can be done to improve this story? Aaron