Hello Navneeth, I would agree with Liam that a session store seems a good fit for your case. But note that session stores would not expire a session themselves and it is still the processor node's job to find those already expired sessions and emit results / delete. You can take a look at the KStreamSessionWindowAggregate inside Kafka code base ( https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java) for a reference.
Guozhang On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson < liam.cla...@adscale.co.nz> wrote: > Hmmm, thanks Navneeth, > > I feel like a session store set to an inactivity period of 10 minutes, > suppressed until session window closed, combined with a GlobalKTable would > be how I'd start to approach this in the DSL, with the below topology. I > have no idea if my ASCII art below will survive email formatting, so I'll > try to explain. User ids stream into the GlobalKTable, and also into the > session store. After 10 minutes of inactivity for a given user id key, the > session expires, and the session store emits the user_id -> some_value. I'd > then map the some_value to null, to take advantage of KTable semantics > where `k -> null` is treated as a delete for key k, so an inactive user > would be deleted from the ktable. You could then periodically query the > ktable's key-value store for outside emission. > > That said, this is only how I'd start to explore the problem, and there are > obvious questions that need to be answered first like how much state would > you end up storing in the session store, etc. I'm hoping someone like John > Roesler who has far better insights into Kafka Streams might weigh in here. > > > user ids ------------------------------------------------------> > globalktable <---- keyValueStore periodically queried. > \------------> session store ----> map (user_id -> null) --/ > > Good luck, > > Liam > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan < > reachnavnee...@gmail.com> > wrote: > > > Hi Liam, > > > > The use case is stream all data and send it to storage after processing. > > Also when the user is inactive for a 10 min period then send a special > > event that marks the user as inactive. I'm trying to implement the > special > > event here. > > > > Thanks > > > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson < > > liam.cla...@adscale.co.nz> wrote: > > > > > Hey Navneeth, > > > > > > So to understand your problem better - do you only want to stream users > > > active within 10 minutes to storage? > > > > > > Cheers, > > > > > > Liam > > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan < > > > reachnavnee...@gmail.com> > > > wrote: > > > > > > > It’s just for emitting to data storage. There is no join here. > > > > > > > > Thanks > > > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson < > > > > liam.cla...@adscale.co.nz> wrote: > > > > > > > > > Hi Navneeth, > > > > > > > > > > What is the purpose of holding these user records? Is it to join > > > against > > > > > other streams, or emit to data storage? > > > > > > > > > > Cheers, > > > > > > > > > > Liam Clarke-Hutchinson > > > > > > > > > > > > > > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, < > > > > reachnavnee...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > I have a question about how I can use window stores to achieve > this > > > use > > > > > > case. Thanks for all the help. > > > > > > > > > > > > A user record will be created when the user first logins and the > > > > records > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each > > > user > > > > > > there will be a TTL but the TTL value will be updated each time > > when > > > > the > > > > > > user is active before he becomes inactive for the entire 10 min > > > period. > > > > > We > > > > > > are currently using PAPI for all our topologies and I was > thinking > > of > > > > > > implementing it using a punctuator. > > > > > > > > > > > > My initial logic was to have a KV store with each user as key and > > TTL > > > > as > > > > > > the value and run a scheduled task every minute that looks at all > > the > > > > > > records which have TTL value lesser than the timestamp. But the > > > problem > > > > > in > > > > > > this approach was performance. When there are more than 1M > records > > it > > > > > takes > > > > > > more than a few seconds to complete this task. > > > > > > > > > > > > Next approach is to have a window store and a KV store. Window > > store > > > > will > > > > > > have each user and corresponding TTL rounded to the nearest > minute. > > > > Then > > > > > > find all keys between the current time and current time - 1min. > > Then > > > > > > iterate these keys and use the KV store to find if the TTL value > is > > > > still > > > > > > the same or if we have received any updates after that. If not > then > > > the > > > > > > user will be evicted. > > > > > > > > > > > > What would be a better and much more scalable solution for this. > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang