Hi Oliwer, I think you are right. There seems to be something going wrong. Just to clarify, you are sure that the growing state size is caused by the window operator?
>From your description I assume that the state size does not depend (solely) on the number of distinct keys. Otherwise, the state size would stop growing at some point. This would be a hint that every window leaves some state behind. AFAIK, processing time session windows are not very common. There might be a bug in the implementation. Could you create a Jira with a description of the problem? It would be great, if you could provide a reproducible example with a data generator source. Thank you, Fabian Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera < o.kost...@adbglobal.com>: > Hi, > > I'm no sure what you mean by windowState.clear(). As far as I understand > you correctly it's a windowState from ProcessWindowFunction Context which > is KeyedStateStore. KeyedStateStore is managing registered keyed states > that I don't have, so without a descriptor I can't access any clear() > method. There is no state that I manage explicitly as you can see here: > https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java > > With best regards > > Oliwer > On 01.10.2019 07:48, Congxian Qiu wrote: > > Hi Oliwer, > > From the description, Seems the state didn't be cleared, maybe you could > check how many {{windowState.clear()}} was triggered in > {{WindowOperator#processElement}}, and try to figure it out why the state > did not be cleared. > > Best, > Congxian > > > Oliwer Kostera <o.kost...@adbglobal.com> 于2019年9月27日周五 下午4:14写道: > >> Hi all, >> >> >> I'm using *ProcessWindowFunction* in a keyed stream with the following >> definition: >> >> final SingleOutputStreamOperator<Message> processWindowFunctionStream = >> >> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) >> .process(new CustomProcessWindowFunction()) >> .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) >> .name("Process window function"); >> >> My checkpointing configuration is set to use RocksDB state backend with >> incremental checkpointing and EXACTLY_ONCE mode. >> >> In a runtime I noticed that even though data ingestion is static - same >> keys and frequency of messages the size of the process window operator >> keeps increasing. I tried to reproduce it with minimal similar setup here: >> https://github.com/loliver1234/flink-process-window-function and was >> successful to do so. >> >> Testing conditions: >> >> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count >> - RabbitMQ sink to collect messages >> - Simple ProcessWindowFunction that only pass messages through >> - Stream time characteristic set to TimeCharacteristic.ProcessingTime >> >> Testing scenario: >> >> - Start flink job and check initial state size - State Size: 127 KB >> - Start sending messages, 1000 same unique keys every 1s (they are >> not falling into defined time window gap set to 100ms, each message should >> create new window) >> - State of the process window operator keeps increasing - after 1mln >> messages state ended up to be around 2mb >> - Stop sending messages and wait till rabbit queue is fully consumed >> and few checkpoints go by >> - Was expected to see state size to decrease to base value but it >> stayed at 2mb >> - Continue to send messages with the same keys and state kept >> increasing trend. >> >> What I checked: >> >> - Registration and deregistration of timers set for time windows - >> each registration matched its deregistration >> - Checked that in fact there are no window merges >> - Tried custom Trigger disabling window merges and setting >> onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state >> behavior >> >> Tested with: >> >> - Local Flink Mini Cluster running from IDE >> - Flink ha standalone cluster run in docker >> >> On staging environment, we noticed that state for that operator keeps >> increasing indefinitely, after some months reaching even 1,5gb for 100k >> unique keys >> >> With best regards >> >> Oliwer >> adbglobal.com <https://www.adbglobal.com> >> >> *This message (including any attachments) may contain confidential, >> proprietary, privileged and/or private information. The information is >> intended for the use of the individual or entity designated above. If you >> are not the intended recipient of this message, please notify the sender >> immediately, and delete the message and any attachments. Any disclosure, >> reproduction, distribution or other use of this message or any attachments >> by an individual or entity other than the intended recipient is STRICTLY >> PROHIBITED.* >> >> *Please note that ADB protects your privacy. Any personal information we >> collect from you is used in accordance with our Privacy Policy >> <https://www.adbglobal.com/privacy-policy/> and in compliance with >> applicable European data protection law (Regulation (EU) 2016/679, General >> Data Protection Regulation) and other statutory provisions. * >> >