Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction 
> with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
<vishal.santo...@gmail.com> wrote:
>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with 
> a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the 
> ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. 
> I would assume that it just is defining new start and end of the merged 
> window, Gcing the old ones ( or at least one of them ) and assigning that 
> even to that new window. What one does with the custom state in 
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done 
> in the process method above,  As in this state is 1 degree removed from what 
> ever flink does internally with it's merges given that the state is scoped to 
> the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vishal.santo...@gmail.com> 
> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  
>>> > (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>> <vishal.santo...@gmail.com> wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not 
>>> > the right approach, given that merging windows will have their own 
>>> > states..and how the merge happens is really at the key level....
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi 
>>> > <vishal.santo...@gmail.com> wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep 
>>> >> the session lean, there is a PurgingTrigger on this aggregate that  
>>> >> fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  
>>> >> (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> 
>>> >> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to 
>>> >>> combine
>>> >>> sliding windows with state scoped to window+key?
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> >>> <vishal.santo...@gmail.com> wrote:
>>> >>> >
>>> >>> > Essentially, Does this code leak state
>>> >>> >
>>> >>> > private static class SessionIdProcessWindowFunction<KEY extends 
>>> >>> > java.io.Serializable, VALUE extends java.io.Serializable>
>>> >>> > extends
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, 
>>> >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> >>> > private static final long serialVersionUID = 1L;
>>> >>> > private final static ValueStateDescriptor<String> sessionId = new 
>>> >>> > ValueStateDescriptor<String>("session_uid",
>>> >>> > String.class);
>>> >>> >
>>> >>> > @Override
>>> >>> > public void process(KEY key,
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, 
>>> >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context 
>>> >>> > context,
>>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, 
>>> >>> > Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> >>> > throws Exception {
>>> >>> > // I need this scoped to key/window
>>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> >>> > UUID uuid = UUID.randomUUID();
>>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> >>> > }
>>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> >>> > out.collect(new 
>>> >>> > KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> >>> > }
>>> >>> > }
>>> >>> >
>>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi 
>>> >>> > <vishal.santo...@gmail.com> wrote:
>>> >>> >>
>>> >>> >> Hello folks,
>>> >>> >>                   The suggestion is to use windowState() for a key 
>>> >>> >> key per window state and clear the state explicitly.  Also it seems 
>>> >>> >> that getRuntime().getState() will return a globalWindow() where 
>>> >>> >> state is shared among windows with the same key. I desire of course 
>>> >>> >> to have state scoped to a key per window and was wanting to use 
>>> >>> >> windowState().. The caveat is that my window is a Session Window and 
>>> >>> >> when I try to use clear()  I am thrown this exception  ( Session 
>>> >>> >> Windows are Merging Windows )
>>> >>> >>
>>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state 
>>> >>> >> is not allowed when using merging windows.
>>> >>> >>
>>> >>> >>
>>> >>> >> The questions are
>>> >>> >>
>>> >>> >> * How do I have state per session window/ per key and still be able 
>>> >>> >> to clear it ?
>>> >>> >> * Does getRuntime().getState() give me the clear() semantics for 
>>> >>> >> free along with state per window per key and thus I  have understood 
>>> >>> >>  getRuntime().getState() wrong ?
>>> >>> >>
>>> >>> >> Regards.
>>> >>> >>
>>> >>> >>
>>> >>> >>

Reply via email to