Hi Vishal, Sorry for the late reply, Please find my answers below. By state I assume the state obtained via getRuntimeContext (access to window state is not allowed)..
> The state is scoped to the key (created per key in the ProcessWindowFunction > with a ttl ) Yes. > The state will remain alive irrespective of whether the Window is closed or > not (a TTL timer does the collection ) Right, but you need to configure TTL when accessing the state [1] > The execution on a key is sequential , as in if 2 events arrive for the 2 > Sessions they happen sequentially ( or in any order but without the need of > synchronization ) Right. > The state mutated by an event in Session A, will be visible to Session B if > an event incident on Session B was to happen subsequently. There is no need > of synchronizing access to the state as it for the same key. Right. Your understanding of merging of window contents is also correct. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl Regards, Roman On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > I had a query Say I have a single key with 2 live sessions ( A and B ) with > a configured lateness . > > Do these invariants hold? > > * The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > * The state will remain alive irrespective of whether the Window is closed or > not (a TTL timer does the collection ) > * The execution on a key is sequential , as in if 2 events arrive for the 2 > Sessions they happen sequentially ( or in any order but without the need of > synchronization ) > * The state mutated by an event in Session A, will be visible to Session B if > an event incident on Session B was to happen subsequently. There is no need > of synchronizing access to the state as it for the same key. > > What I am not sure about is what happens when session A merge with session B. > I would assume that it just is defining new start and end of the merged > window, Gcing the old ones ( or at least one of them ) and assigning that > even to that new window. What one does with the custom state in > ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is done > in the process method above, As in this state is 1 degree removed from what > ever flink does internally with it's merges given that the state is scoped to > the key. > > > > > > > > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: >> >> Yep, makes sense. >> >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <ro...@apache.org> wrote: >>> >>> > Want to confirm that the keys are GCed ( along with state ) once the >>> > (windows close + lateness ) ? >>> Window state is cleared (as well as the window itself), but global >>> state is not (unless you use TTL). >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >>> >>> Regards, >>> Roman >>> >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi >>> <vishal.santo...@gmail.com> wrote: >>> > >>> > Sometimes writing it down makes you think. I now realize that this is not >>> > the right approach, given that merging windows will have their own >>> > states..and how the merge happens is really at the key level.... >>> > >>> > >>> > >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi >>> > <vishal.santo...@gmail.com> wrote: >>> >> >>> >> I intend to augment every event in a session with a unique ID. To keep >>> >> the session lean, there is a PurgingTrigger on this aggregate that >>> >> fires on a count of 1. >>> >> >>> >> >> (except that the number of keys can grow). >>> >> >>> >> Want to confirm that the keys are GCed ( along with state ) once the >>> >> (windows close + lateness ) ? >>> >> >>> >> >>> >> >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <ro...@apache.org> >>> >> wrote: >>> >>> >>> >>> Hi Vishal, >>> >>> >>> >>> There is no leak in the code you provided (except that the number of >>> >>> keys can grow). >>> >>> But as you figured out the state is scoped to key, not to window+key. >>> >>> >>> >>> Could you explain what you are trying to achieve and why do you need to >>> >>> combine >>> >>> sliding windows with state scoped to window+key? >>> >>> >>> >>> Regards, >>> >>> Roman >>> >>> >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >>> >>> <vishal.santo...@gmail.com> wrote: >>> >>> > >>> >>> > Essentially, Does this code leak state >>> >>> > >>> >>> > private static class SessionIdProcessWindowFunction<KEY extends >>> >>> > java.io.Serializable, VALUE extends java.io.Serializable> >>> >>> > extends >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >>> >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> { >>> >>> > private static final long serialVersionUID = 1L; >>> >>> > private final static ValueStateDescriptor<String> sessionId = new >>> >>> > ValueStateDescriptor<String>("session_uid", >>> >>> > String.class); >>> >>> > >>> >>> > @Override >>> >>> > public void process(KEY key, >>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, >>> >>> > KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context >>> >>> > context, >>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, >>> >>> > Collector<KeyedSessionWithSessionID<KEY, VALUE>> out) >>> >>> > throws Exception { >>> >>> > // I need this scoped to key/window >>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) { >>> >>> > UUID uuid = UUID.randomUUID(); >>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >>> >>> > } >>> >>> > String uuid = getRuntimeContext().getState(sessionId).value(); >>> >>> > out.collect(new >>> >>> > KeyedSessionWithSessionID<>(elements.iterator().next(), uuid)); >>> >>> > } >>> >>> > } >>> >>> > >>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi >>> >>> > <vishal.santo...@gmail.com> wrote: >>> >>> >> >>> >>> >> Hello folks, >>> >>> >> The suggestion is to use windowState() for a key >>> >>> >> key per window state and clear the state explicitly. Also it seems >>> >>> >> that getRuntime().getState() will return a globalWindow() where >>> >>> >> state is shared among windows with the same key. I desire of course >>> >>> >> to have state scoped to a key per window and was wanting to use >>> >>> >> windowState().. The caveat is that my window is a Session Window and >>> >>> >> when I try to use clear() I am thrown this exception ( Session >>> >>> >> Windows are Merging Windows ) >>> >>> >> >>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state >>> >>> >> is not allowed when using merging windows. >>> >>> >> >>> >>> >> >>> >>> >> The questions are >>> >>> >> >>> >>> >> * How do I have state per session window/ per key and still be able >>> >>> >> to clear it ? >>> >>> >> * Does getRuntime().getState() give me the clear() semantics for >>> >>> >> free along with state per window per key and thus I have understood >>> >>> >> getRuntime().getState() wrong ? >>> >>> >> >>> >>> >> Regards. >>> >>> >> >>> >>> >> >>> >>> >>