Hi Oliwer, >From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared.
Best, Congxian Oliwer Kostera <o.kost...@adbglobal.com> 于2019年9月27日周五 下午4:14写道: > Hi all, > > > I'm using *ProcessWindowFunction* in a keyed stream with the following > definition: > > final SingleOutputStreamOperator<Message> processWindowFunctionStream = > > keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new CustomProcessWindowFunction()) > .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) > .name("Process window function"); > > My checkpointing configuration is set to use RocksDB state backend with > incremental checkpointing and EXACTLY_ONCE mode. > > In a runtime I noticed that even though data ingestion is static - same > keys and frequency of messages the size of the process window operator > keeps increasing. I tried to reproduce it with minimal similar setup here: > https://github.com/loliver1234/flink-process-window-function and was > successful to do so. > > Testing conditions: > > - RabbitMQ source with Exactly-once guarantee and 65k prefetch count > - RabbitMQ sink to collect messages > - Simple ProcessWindowFunction that only pass messages through > - Stream time characteristic set to TimeCharacteristic.ProcessingTime > > Testing scenario: > > - Start flink job and check initial state size - State Size: 127 KB > - Start sending messages, 1000 same unique keys every 1s (they are not > falling into defined time window gap set to 100ms, each message should > create new window) > - State of the process window operator keeps increasing - after 1mln > messages state ended up to be around 2mb > - Stop sending messages and wait till rabbit queue is fully consumed > and few checkpoints go by > - Was expected to see state size to decrease to base value but it > stayed at 2mb > - Continue to send messages with the same keys and state kept > increasing trend. > > What I checked: > > - Registration and deregistration of timers set for time windows - > each registration matched its deregistration > - Checked that in fact there are no window merges > - Tried custom Trigger disabling window merges and setting > onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state > behavior > > Tested with: > > - Local Flink Mini Cluster running from IDE > - Flink ha standalone cluster run in docker > > On staging environment, we noticed that state for that operator keeps > increasing indefinitely, after some months reaching even 1,5gb for 100k > unique keys > > With best regards > > Oliwer > adbglobal.com <https://www.adbglobal.com> > > *This message (including any attachments) may contain confidential, > proprietary, privileged and/or private information. The information is > intended for the use of the individual or entity designated above. If you > are not the intended recipient of this message, please notify the sender > immediately, and delete the message and any attachments. Any disclosure, > reproduction, distribution or other use of this message or any attachments > by an individual or entity other than the intended recipient is STRICTLY > PROHIBITED.* > > *Please note that ADB protects your privacy. Any personal information we > collect from you is used in accordance with our Privacy Policy > <https://www.adbglobal.com/privacy-policy/> and in compliance with > applicable European data protection law (Regulation (EU) 2016/679, General > Data Protection Regulation) and other statutory provisions. * >