Hi all,We have an Apache Flink application which generates player sessions
based on player events keyed by playerId. Sessions are based on EventTime. A
session is created on first event event for that player and closes if there
are 30 mins of inactivity. Events are merged in our custom
/PlayerSessionAggregator implements AggregateFunction/. We deployed this
application on a Flink dev cluster (having checkpoints enabled), however we
noted that the state keeps growing until we end up with an out of memory as
shown in the attached file /flink_oom_exception.txt/We tried the using the
/PurgingTrigger/ together /CountTrigger/ however since it uses
/FIRE_AND_PURGE/ we were ending up with a session per event i.e. event were
not being merged.Using an /Evictor/ we ended up with same situation because
events were being removed from the window. Hence we resorted to using State
TTL:
 
We created a /StateTtlConfig/ having an expiry of 120 minutes to
periodically remove expired sessions from the state. 
This /stateTtlConfig/ is passed to the flatMap /PlayerSessionEventMapper
extends RichFlatMapFunction/. 
 The /PlayerSessionEventMapper/ has a /ValueStateDescriptor/ to provide
access to state per player. This /ValueStateDescriptor/ uses the previously
mentioned /stateTtlConfig/ 
 The state per player is updated on each player event. Also we enforce a
state access (using / ValueState.value()/) since as per documentation
"expired values are only removed when they are read out explicitly, e.g. by
calling ValueState.value()" 
This idea was based on the examples as provided in:
 https://flink.apache.org/2019/05/19/state-ttl.html 

https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state
 

https://www.slideshare.net/FlinkForward/time-tolive-how-to-perform-automatic-state-cleanup-in-apache-flink-andrey-zagrebin-ververica
 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively
 
*Code: * PlayerSessionApp.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionApp.java>
  
PlayerSessionEventMapper.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionEventMapper.java>
 
(Some custom classes have been removed for simplicity reasons)

* Our questions are: *
are expired session windows automatically removed from state? if not, what's
the best way to do it? 
how can we query state size? 
how can we query number of windows in state? 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to