Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the
window operator?

>From your description I assume that the state size does not depend (solely)
on the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be
a bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data
generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <
o.kost...@adbglobal.com>:

> Hi,
>
> I'm no sure what you mean by windowState.clear(). As far as I understand
> you correctly it's a windowState from ProcessWindowFunction Context which
> is KeyedStateStore. KeyedStateStore is managing registered keyed states
> that I don't have, so without a descriptor I can't access any clear()
> method. There is no state that I manage explicitly as you can see here:
> https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java
>
> With best regards
>
> Oliwer
> On 01.10.2019 07:48, Congxian Qiu wrote:
>
> Hi Oliwer,
>
> From the description, Seems the state didn't be cleared, maybe you could
> check how many {{windowState.clear()}} was triggered in
> {{WindowOperator#processElement}}, and try to figure it out why the state
> did not be cleared.
>
> Best,
> Congxian
>
>
> Oliwer Kostera <o.kost...@adbglobal.com> 于2019年9月27日周五 下午4:14写道:
>
>> Hi all,
>>
>>
>> I'm using *ProcessWindowFunction* in a keyed stream with the following
>> definition:
>>
>> final SingleOutputStreamOperator<Message> processWindowFunctionStream =
>>      
>> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>>                 .process(new CustomProcessWindowFunction())
>>                 .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>>                 .name("Process window function");
>>
>> My checkpointing configuration is set to use RocksDB state backend with
>> incremental checkpointing and EXACTLY_ONCE mode.
>>
>> In a runtime I noticed that even though data ingestion is static - same
>> keys and frequency of messages the size of the process window operator
>> keeps increasing. I tried to reproduce it with minimal similar setup here:
>>  https://github.com/loliver1234/flink-process-window-function and was
>> successful to do so.
>>
>> Testing conditions:
>>
>>    - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>>    - RabbitMQ sink to collect messages
>>    - Simple ProcessWindowFunction that only pass messages through
>>    - Stream time characteristic set to TimeCharacteristic.ProcessingTime
>>
>> Testing scenario:
>>
>>    - Start flink job and check initial state size - State Size: 127 KB
>>    - Start sending messages, 1000 same unique keys every 1s (they are
>>    not falling into defined time window gap set to 100ms, each message should
>>    create new window)
>>    - State of the process window operator keeps increasing - after 1mln
>>    messages state ended up to be around 2mb
>>    - Stop sending messages and wait till rabbit queue is fully consumed
>>    and few checkpoints go by
>>    - Was expected to see state size to decrease to base value but it
>>    stayed at 2mb
>>    - Continue to send messages with the same keys and state kept
>>    increasing trend.
>>
>> What I checked:
>>
>>    - Registration and deregistration of timers set for time windows -
>>    each registration matched its deregistration
>>    - Checked that in fact there are no window merges
>>    - Tried custom Trigger disabling window merges and setting
>>    onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>>    behavior
>>
>> Tested with:
>>
>>    - Local Flink Mini Cluster running from IDE
>>    - Flink ha standalone cluster  run in docker
>>
>> On staging environment, we noticed that state for that operator keeps
>> increasing indefinitely, after some months reaching even 1,5gb for 100k
>> unique keys
>>
>> With best regards
>>
>> Oliwer
>> adbglobal.com <https://www.adbglobal.com>
>>
>> *This message (including any attachments) may contain confidential,
>> proprietary, privileged and/or private information. The information is
>> intended for the use of the individual or entity designated above. If you
>> are not the intended recipient of this message, please notify the sender
>> immediately, and delete the message and any attachments. Any disclosure,
>> reproduction, distribution or other use of this message or any attachments
>> by an individual or entity other than the intended recipient is STRICTLY
>> PROHIBITED.*
>>
>> *Please note that ADB protects your privacy. Any personal information we
>> collect from you is used in accordance with our Privacy Policy
>> <https://www.adbglobal.com/privacy-policy/> and in compliance with
>> applicable European data protection law (Regulation (EU) 2016/679, General
>> Data Protection Regulation) and other statutory provisions. *
>>
>

Reply via email to