Hello Navneeth,

I would agree with Liam that a session store seems a good fit for your
case. But note that session stores would not expire a session themselves
and it is still the processor node's job to find those already expired
sessions and emit results / delete. You can take a look at
the KStreamSessionWindowAggregate inside Kafka code base (
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java)
for a reference.


Guozhang

On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hmmm, thanks Navneeth,
>
> I feel like a session store set to an inactivity period of 10 minutes,
> suppressed until session window closed, combined with a GlobalKTable would
> be how I'd start to approach this in the DSL, with the below topology. I
> have no idea if my ASCII art below will survive email formatting, so I'll
> try to explain. User ids stream into the GlobalKTable, and also into the
> session store. After 10 minutes of inactivity for a given user id key, the
> session expires, and the session store emits the user_id -> some_value. I'd
> then map the some_value to null, to take advantage of KTable semantics
> where `k -> null` is treated as a delete for key k, so an inactive user
> would be deleted from the ktable. You could then periodically query the
> ktable's key-value store for outside emission.
>
> That said, this is only how I'd start to explore the problem, and there are
> obvious questions that need to be answered first like how much state would
> you end up storing in the session store, etc. I'm hoping someone like John
> Roesler who has far better insights into Kafka Streams might weigh in here.
>
>
> user ids ------------------------------------------------------>
> globalktable <---- keyValueStore periodically queried.
>       \------------> session store ----> map (user_id -> null) --/
>
> Good luck,
>
> Liam
>
> On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> reachnavnee...@gmail.com>
> wrote:
>
> > Hi Liam,
> >
> > The use case is stream all data and send it to storage after processing.
> > Also when the user is inactive for a 10 min period then send a special
> > event that marks the user as inactive. I'm trying to implement the
> special
> > event here.
> >
> > Thanks
> >
> >
> > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hey Navneeth,
> > >
> > > So to understand your problem better - do you only want to stream users
> > > active within 10 minutes to storage?
> > >
> > > Cheers,
> > >
> > > Liam
> > >
> > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > wrote:
> > >
> > > > It’s just for emitting to data storage. There is no join here.
> > > >
> > > > Thanks
> > > >
> > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> wrote:
> > > >
> > > > > Hi Navneeth,
> > > > >
> > > > > What is the purpose of holding these user records? Is it to join
> > > against
> > > > > other streams, or emit to data storage?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Liam Clarke-Hutchinson
> > > > >
> > > > >
> > > > >
> > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > reachnavnee...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I have a question about how I can use window stores to achieve
> this
> > > use
> > > > > > case. Thanks for all the help.
> > > > > >
> > > > > > A user record will be created when the user first logins and the
> > > > records
> > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> > > user
> > > > > > there will be a TTL but the TTL value will be updated each time
> > when
> > > > the
> > > > > > user is active before he becomes inactive for the entire 10 min
> > > period.
> > > > > We
> > > > > > are currently using PAPI for all our topologies and I was
> thinking
> > of
> > > > > > implementing it using a punctuator.
> > > > > >
> > > > > > My initial logic was to have a KV store with each user as key and
> > TTL
> > > > as
> > > > > > the value and run a scheduled task every minute that looks at all
> > the
> > > > > > records which have TTL value lesser than the timestamp. But the
> > > problem
> > > > > in
> > > > > > this approach was performance. When there are more than 1M
> records
> > it
> > > > > takes
> > > > > > more than a few seconds to complete this task.
> > > > > >
> > > > > > Next approach is to have a window store and a KV store. Window
> > store
> > > > will
> > > > > > have each user and corresponding TTL rounded to the nearest
> minute.
> > > > Then
> > > > > > find all keys between the current time and current time - 1min.
> > Then
> > > > > > iterate these keys and use the KV store to find if the TTL value
> is
> > > > still
> > > > > > the same or if we have received any updates after that. If not
> then
> > > the
> > > > > > user will be evicted.
> > > > > >
> > > > > > What would be a better and much more scalable solution for this.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to