Hi all,We have an Apache Flink application which generates player sessions based on player events keyed by playerId. Sessions are based on EventTime. A session is created on first event event for that player and closes if there are 30 mins of inactivity. Events are merged in our custom /PlayerSessionAggregator implements AggregateFunction/. We deployed this application on a Flink dev cluster (having checkpoints enabled), however we noted that the state keeps growing until we end up with an out of memory as shown in the attached file /flink_oom_exception.txt/We tried the using the /PurgingTrigger/ together /CountTrigger/ however since it uses /FIRE_AND_PURGE/ we were ending up with a session per event i.e. event were not being merged.Using an /Evictor/ we ended up with same situation because events were being removed from the window. Hence we resorted to using State TTL: We created a /StateTtlConfig/ having an expiry of 120 minutes to periodically remove expired sessions from the state. This /stateTtlConfig/ is passed to the flatMap /PlayerSessionEventMapper extends RichFlatMapFunction/. The /PlayerSessionEventMapper/ has a /ValueStateDescriptor/ to provide access to state per player. This /ValueStateDescriptor/ uses the previously mentioned /stateTtlConfig/ The state per player is updated on each player event. Also we enforce a state access (using / ValueState.value()/) since as per documentation "expired values are only removed when they are read out explicitly, e.g. by calling ValueState.value()" This idea was based on the examples as provided in: https://flink.apache.org/2019/05/19/state-ttl.html
https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state https://www.slideshare.net/FlinkForward/time-tolive-how-to-perform-automatic-state-cleanup-in-apache-flink-andrey-zagrebin-ververica https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively *Code: * PlayerSessionApp.java <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionApp.java> PlayerSessionEventMapper.java <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionEventMapper.java> (Some custom classes have been removed for simplicity reasons) * Our questions are: * are expired session windows automatically removed from state? if not, what's the best way to do it? how can we query state size? how can we query number of windows in state? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/