[ https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455040#comment-17455040 ]
Yun Tang commented on FLINK-14197: ---------------------------------- [~loliver] Do you still meet the problem? If the state size is still under 100MB, I think this behavior is by design for RocksDB state-backend. This is because RocksDB leverage level compaction to delete old outdated data, which means if the state size cannot reach the level-size threshold (level-0 is 4 files, level-1 is 256MB and level-2 is 2560MB by default), you cannot see the state size decreasing. > Increasing trend for state size of keyed stream using ProcessWindowFunction > with ProcessingTimeSessionWindows > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-14197 > URL: https://issues.apache.org/jira/browse/FLINK-14197 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.9.0 > Environment: Tested with: > * Local Flink Mini Cluster running from IDE > * Flink standalone cluster run in docker > Reporter: Oliver Kostera > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > I'm using *ProcessWindowFunction* in a keyed stream with the following > definition: > {code:java} > final SingleOutputStreamOperator<Message> processWindowFunctionStream > = > > keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new > CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) > .name("Process window function"); > {code} > My checkpointing configuration is set to use RocksDB state backend with > incremental checkpointing and EXACTLY_ONCE mode. > In a runtime I noticed that even though data ingestion is static - same keys > and frequency of messages the size of the process window operator keeps > increasing. I tried to reproduce it with minimal similar setup here: > https://github.com/loliver1234/flink-process-window-function and was > successful to do so. > Testing conditions: > - RabbitMQ source with Exactly-once guarantee and 65k prefetch count > - RabbitMQ sink to collect messages > - Simple ProcessWindowFunction that only pass messages through > - Stream time characteristic set to TimeCharacteristic.ProcessingTime > Testing scenario: > - Start flink job and check initial state size - State Size: 127 KB > - Start sending messages, 1000 same unique keys every 1s (they are not > falling into defined time window gap set to 100ms, each message should create > new window) > - State of the process window operator keeps increasing - after 1mln messages > state ended up to be around 2mb > - Stop sending messages and wait till rabbit queue is fully consumed and few > checkpoints go by > - Was expected to see state size to decrease to base value but it stayed at > 2mb > - Continue to send messages with the same keys and state kept increasing > trend. > What I checked: > - Registration and deregistration of timestamps set for time windows - each > registration matched its deregistration > - Checked that in fact there are no window merges > - Tried custom Trigger disabling window merges and setting onProcessingTime > trigger to TriggerResult.FIRE_AND_PURGE - same state behavior > On staging environment, we noticed that state for that operator keeps > increasing indefinitely, after some months reaching even 1,5gb for 100k > unique keys > Flink commit id: 9c32ed9 > -- This message was sent by Atlassian Jira (v8.20.1#820001)