Oliver Kostera created FLINK-14197: -------------------------------------- Summary: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows Key: FLINK-14197 URL: https://issues.apache.org/jira/browse/FLINK-14197 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.9.0 Environment: Tested with: * Local Flink Mini Cluster running from IDE * Flink standalone cluster run in docker Reporter: Oliver Kostera
I'm using *ProcessWindowFunction* in a keyed stream with the following definition: {code:java} final SingleOutputStreamOperator<Message> processWindowFunctionStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) .name("Process window function"); {code} My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode. In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so. Testing conditions: - RabbitMQ source with Exactly-once guarantee and 65k prefetch count - RabbitMQ sink to collect messages - Simple ProcessWindowFunction that only pass messages through - Stream time characteristic set to TimeCharacteristic.ProcessingTime Testing scenario: - Start flink job and check initial state size - State Size: 127 KB - Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window) - State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb - Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by - Was expected to see state size to decrease to base value but it stayed at 2mb - Continue to send messages with the same keys and state kept increasing trend. What I checked: - Registration and deregistration of timestamps set for time windows - each registration matched its deregistration - Checked that in fact there are no window merges - Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys Flink commit id: 9c32ed9 -- This message was sent by Atlassian Jira (v8.3.4#803005)