Hi all,
I'm using ProcessWindowFunction in a keyed stream with the following definition:
final SingleOutputStreamOperator<Message> processWindowFunctionStream =
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");
My checkpointing configuration is set to use RocksDB state backend with
incremental checkpointing and EXACTLY_ONCE mode.
In a runtime I noticed that even though data ingestion is static - same keys
and frequency of messages the size of the process window operator keeps
increasing. I tried to reproduce it with minimal similar setup here:
https://github.com/loliver1234/flink-process-window-function and was successful
to do so.
Testing conditions:
* RabbitMQ source with Exactly-once guarantee and 65k prefetch count
* RabbitMQ sink to collect messages
* Simple ProcessWindowFunction that only pass messages through
* Stream time characteristic set to TimeCharacteristic.ProcessingTime
Testing scenario:
* Start flink job and check initial state size - State Size: 127 KB
* Start sending messages, 1000 same unique keys every 1s (they are not
falling into defined time window gap set to 100ms, each message should create
new window)
* State of the process window operator keeps increasing - after 1mln
messages state ended up to be around 2mb
* Stop sending messages and wait till rabbit queue is fully consumed and
few checkpoints go by
* Was expected to see state size to decrease to base value but it stayed at
2mb
* Continue to send messages with the same keys and state kept increasing
trend.
What I checked:
* Registration and deregistration of timers set for time windows - each
registration matched its deregistration
* Checked that in fact there are no window merges
* Tried custom Trigger disabling window merges and setting onProcessingTime
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
Tested with:
* Local Flink Mini Cluster running from IDE
* Flink ha standalone cluster run in docker
On staging environment, we noticed that state for that operator keeps
increasing indefinitely, after some months reaching even 1,5gb for 100k unique
keys
With best regards
Oliwer
[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary,
privileged and/or private information. The information is intended for the use
of the individual or entity designated above. If you are not the intended
recipient of this message, please notify the sender immediately, and delete the
message and any attachments. Any disclosure, reproduction, distribution or
other use of this message or any attachments by an individual or entity other
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect
from you is used in accordance with our Privacy
Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with
applicable European data protection law (Regulation (EU) 2016/679, General Data
Protection Regulation) and other statutory provisions.