Thanks a lot Guozhang. I will try and let you know. Really appreciate all the help. This community has been amazing.
Thanks On Tue, Feb 23, 2021 at 5:48 PM Guozhang Wang <wangg...@gmail.com> wrote: > Sorry I was not very clear before: by "WindowStore" I meant implementing > your own customized store based on a kvStore where the key is a combo > <timestamp, key>. Note you put timestamp first then key in your > serialization format, so that you can range-fetch with just the prefix on > timestamp then. In fact `WindowStore` that we provide is also following > this design principle, but it's combo key is in <key, timestamp> so range > fetch is not as efficient since you'd need to fetch a much larger range and > then filter a lot of records. > > > Guozhang > > On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan < > reachnavnee...@gmail.com> > wrote: > > > Thanks Guozhang. > > > > I don't see the remove method in window stores. Am I missing something? > It > > would be very nice to implement the optimization you had mentioned. > > > > Thanks > > > > On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I see. In that case I think your design with a KVstore plus a > > book-keeping > > > window store would work better. One minor optimization you can try > > though, > > > is that instead of trying to check if the TTL has changed or not when > > > expiring from the window store, you can try to delete from the window > > store > > > whenever you are updating the kv-store. More specifically, when you > > update > > > the kv-store, do sth. like this: > > > > > > value = kvStore.get(k); // here value also encodes the timestamp, e.g. > > see > > > "TimestampedKeyValueStore" interface > > > if (value != v) > > > // v is the new value you want to put > > > windowStore.remove(combo-key); // here the combo-key is a <timestamp, > > > key> where timestamp is extracted from value > > > > > > kvStore.put(k, v) > > > kvStore.put(combo-key); // it is in <new-timestamp-of-v, key> > > > > > > Later when you expire, you do not need to check on kvStore if the > value's > > > timestamp has changed or not. > > > > > > > > > > > > > > > On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan < > > > reachnavnee...@gmail.com> > > > wrote: > > > > > > > Thanks Liam & Guozhang. > > > > > > > > First of all, we use PAPI in our entire topology and we would like to > > > > retain it that way rather than combining with DSL. Secondly, even I > was > > > > more leaning towards session store but the problem I found with > session > > > > store is I cannot get all the expired sessions without keys where as > > > > windowstore has the option to get all keys by range. Ideally I would > > like > > > > to have a punctuate function which finds all the expired records and > > send > > > > it to downstream. I looked at KStreamSessionWindowAggregate but it > > looks > > > > like we need a new value coming in for the key to even send updates. > In > > > my > > > > case there might not be any activity at all but I still need to send > > the > > > > delete event. > > > > > > > > Here is how we want it to work > > > > T -> User1 (Active event) > > > > T+5 -> User1 (Active event) > > > > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min > > > > period) > > > > > > > > Thanks > > > > > > > > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Navneeth, > > > > > > > > > > I would agree with Liam that a session store seems a good fit for > > your > > > > > case. But note that session stores would not expire a session > > > themselves > > > > > and it is still the processor node's job to find those already > > expired > > > > > sessions and emit results / delete. You can take a look at > > > > > the KStreamSessionWindowAggregate inside Kafka code base ( > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java > > > > > ) > > > > > for a reference. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson < > > > > > liam.cla...@adscale.co.nz> wrote: > > > > > > > > > > > Hmmm, thanks Navneeth, > > > > > > > > > > > > I feel like a session store set to an inactivity period of 10 > > > minutes, > > > > > > suppressed until session window closed, combined with a > > GlobalKTable > > > > > would > > > > > > be how I'd start to approach this in the DSL, with the below > > > topology. > > > > I > > > > > > have no idea if my ASCII art below will survive email formatting, > > so > > > > I'll > > > > > > try to explain. User ids stream into the GlobalKTable, and also > > into > > > > the > > > > > > session store. After 10 minutes of inactivity for a given user id > > > key, > > > > > the > > > > > > session expires, and the session store emits the user_id -> > > > some_value. > > > > > I'd > > > > > > then map the some_value to null, to take advantage of KTable > > > semantics > > > > > > where `k -> null` is treated as a delete for key k, so an > inactive > > > user > > > > > > would be deleted from the ktable. You could then periodically > query > > > the > > > > > > ktable's key-value store for outside emission. > > > > > > > > > > > > That said, this is only how I'd start to explore the problem, and > > > there > > > > > are > > > > > > obvious questions that need to be answered first like how much > > state > > > > > would > > > > > > you end up storing in the session store, etc. I'm hoping someone > > like > > > > > John > > > > > > Roesler who has far better insights into Kafka Streams might > weigh > > in > > > > > here. > > > > > > > > > > > > > > > > > > user ids ------------------------------------------------------> > > > > > > globalktable <---- keyValueStore periodically queried. > > > > > > \------------> session store ----> map (user_id -> null) > --/ > > > > > > > > > > > > Good luck, > > > > > > > > > > > > Liam > > > > > > > > > > > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan < > > > > > > reachnavnee...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Liam, > > > > > > > > > > > > > > The use case is stream all data and send it to storage after > > > > > processing. > > > > > > > Also when the user is inactive for a 10 min period then send a > > > > special > > > > > > > event that marks the user as inactive. I'm trying to implement > > the > > > > > > special > > > > > > > event here. > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson < > > > > > > > liam.cla...@adscale.co.nz> wrote: > > > > > > > > > > > > > > > Hey Navneeth, > > > > > > > > > > > > > > > > So to understand your problem better - do you only want to > > stream > > > > > users > > > > > > > > active within 10 minutes to storage? > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Liam > > > > > > > > > > > > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan < > > > > > > > > reachnavnee...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > It’s just for emitting to data storage. There is no join > > here. > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson < > > > > > > > > > liam.cla...@adscale.co.nz> wrote: > > > > > > > > > > > > > > > > > > > Hi Navneeth, > > > > > > > > > > > > > > > > > > > > What is the purpose of holding these user records? Is it > to > > > > join > > > > > > > > against > > > > > > > > > > other streams, or emit to data storage? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Liam Clarke-Hutchinson > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, < > > > > > > > > > reachnavnee...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > > > I have a question about how I can use window stores to > > > > achieve > > > > > > this > > > > > > > > use > > > > > > > > > > > case. Thanks for all the help. > > > > > > > > > > > > > > > > > > > > > > A user record will be created when the user first > logins > > > and > > > > > the > > > > > > > > > records > > > > > > > > > > > needs to be cleaned up after 10 mins of inactivity. > Thus > > > for > > > > > each > > > > > > > > user > > > > > > > > > > > there will be a TTL but the TTL value will be updated > > each > > > > time > > > > > > > when > > > > > > > > > the > > > > > > > > > > > user is active before he becomes inactive for the > entire > > 10 > > > > min > > > > > > > > period. > > > > > > > > > > We > > > > > > > > > > > are currently using PAPI for all our topologies and I > was > > > > > > thinking > > > > > > > of > > > > > > > > > > > implementing it using a punctuator. > > > > > > > > > > > > > > > > > > > > > > My initial logic was to have a KV store with each user > as > > > key > > > > > and > > > > > > > TTL > > > > > > > > > as > > > > > > > > > > > the value and run a scheduled task every minute that > > looks > > > at > > > > > all > > > > > > > the > > > > > > > > > > > records which have TTL value lesser than the timestamp. > > But > > > > the > > > > > > > > problem > > > > > > > > > > in > > > > > > > > > > > this approach was performance. When there are more than > > 1M > > > > > > records > > > > > > > it > > > > > > > > > > takes > > > > > > > > > > > more than a few seconds to complete this task. > > > > > > > > > > > > > > > > > > > > > > Next approach is to have a window store and a KV store. > > > > Window > > > > > > > store > > > > > > > > > will > > > > > > > > > > > have each user and corresponding TTL rounded to the > > nearest > > > > > > minute. > > > > > > > > > Then > > > > > > > > > > > find all keys between the current time and current > time - > > > > 1min. > > > > > > > Then > > > > > > > > > > > iterate these keys and use the KV store to find if the > > TTL > > > > > value > > > > > > is > > > > > > > > > still > > > > > > > > > > > the same or if we have received any updates after that. > > If > > > > not > > > > > > then > > > > > > > > the > > > > > > > > > > > user will be evicted. > > > > > > > > > > > > > > > > > > > > > > What would be a better and much more scalable solution > > for > > > > > this. > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >