I see. In that case I think your design with a KVstore plus a book-keeping
window store would work better. One minor optimization you can try though,
is that instead of trying to check if the TTL has changed or not when
expiring from the window store, you can try to delete from the window store
whenever you are updating the kv-store. More specifically, when you update
the kv-store, do sth. like this:

value = kvStore.get(k);  // here value also encodes the timestamp, e.g. see
"TimestampedKeyValueStore" interface
if (value != v)
  // v is the new value you want to put
  windowStore.remove(combo-key); // here the combo-key is a <timestamp,
key> where timestamp is extracted from value

kvStore.put(k, v)
kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>

Later when you expire, you do not need to check on kvStore if the value's
timestamp has changed or not.




On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Thanks Liam & Guozhang.
>
> First of all, we use PAPI in our entire topology and we would like to
> retain it that way rather than combining with DSL. Secondly, even I was
> more leaning towards session store but the problem I found with session
> store is I cannot get all the expired sessions without keys where as
> windowstore has the option to get all keys by range. Ideally I would like
> to have a punctuate function which finds all the expired records and send
> it to downstream. I looked at KStreamSessionWindowAggregate but it looks
> like we need a new value coming in for the key to even send updates. In my
> case there might not be any activity at all but I still need to send the
> delete event.
>
> Here is how we want it to work
> T -> User1 (Active event)
> T+5 -> User1 (Active event)
> T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> period)
>
> Thanks
>
> On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Navneeth,
> >
> > I would agree with Liam that a session store seems a good fit for your
> > case. But note that session stores would not expire a session themselves
> > and it is still the processor node's job to find those already expired
> > sessions and emit results / delete. You can take a look at
> > the KStreamSessionWindowAggregate inside Kafka code base (
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > )
> > for a reference.
> >
> >
> > Guozhang
> >
> > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hmmm, thanks Navneeth,
> > >
> > > I feel like a session store set to an inactivity period of 10 minutes,
> > > suppressed until session window closed, combined with a GlobalKTable
> > would
> > > be how I'd start to approach this in the DSL, with the below topology.
> I
> > > have no idea if my ASCII art below will survive email formatting, so
> I'll
> > > try to explain. User ids stream into the GlobalKTable, and also into
> the
> > > session store. After 10 minutes of inactivity for a given user id key,
> > the
> > > session expires, and the session store emits the user_id -> some_value.
> > I'd
> > > then map the some_value to null, to take advantage of KTable semantics
> > > where `k -> null` is treated as a delete for key k, so an inactive user
> > > would be deleted from the ktable. You could then periodically query the
> > > ktable's key-value store for outside emission.
> > >
> > > That said, this is only how I'd start to explore the problem, and there
> > are
> > > obvious questions that need to be answered first like how much state
> > would
> > > you end up storing in the session store, etc. I'm hoping someone like
> > John
> > > Roesler who has far better insights into Kafka Streams might weigh in
> > here.
> > >
> > >
> > > user ids ------------------------------------------------------>
> > > globalktable <---- keyValueStore periodically queried.
> > >       \------------> session store ----> map (user_id -> null) --/
> > >
> > > Good luck,
> > >
> > > Liam
> > >
> > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > wrote:
> > >
> > > > Hi Liam,
> > > >
> > > > The use case is stream all data and send it to storage after
> > processing.
> > > > Also when the user is inactive for a 10 min period then send a
> special
> > > > event that marks the user as inactive. I'm trying to implement the
> > > special
> > > > event here.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> wrote:
> > > >
> > > > > Hey Navneeth,
> > > > >
> > > > > So to understand your problem better - do you only want to stream
> > users
> > > > > active within 10 minutes to storage?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Liam
> > > > >
> > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > reachnavnee...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > It’s just for emitting to data storage. There is no join here.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > liam.cla...@adscale.co.nz> wrote:
> > > > > >
> > > > > > > Hi Navneeth,
> > > > > > >
> > > > > > > What is the purpose of holding these user records? Is it to
> join
> > > > > against
> > > > > > > other streams, or emit to data storage?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Liam Clarke-Hutchinson
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > reachnavnee...@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I have a question about how I can use window stores to
> achieve
> > > this
> > > > > use
> > > > > > > > case. Thanks for all the help.
> > > > > > > >
> > > > > > > > A user record will be created when the user first logins and
> > the
> > > > > > records
> > > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for
> > each
> > > > > user
> > > > > > > > there will be a TTL but the TTL value will be updated each
> time
> > > > when
> > > > > > the
> > > > > > > > user is active before he becomes inactive for the entire 10
> min
> > > > > period.
> > > > > > > We
> > > > > > > > are currently using PAPI for all our topologies and I was
> > > thinking
> > > > of
> > > > > > > > implementing it using a punctuator.
> > > > > > > >
> > > > > > > > My initial logic was to have a KV store with each user as key
> > and
> > > > TTL
> > > > > > as
> > > > > > > > the value and run a scheduled task every minute that looks at
> > all
> > > > the
> > > > > > > > records which have TTL value lesser than the timestamp. But
> the
> > > > > problem
> > > > > > > in
> > > > > > > > this approach was performance. When there are more than 1M
> > > records
> > > > it
> > > > > > > takes
> > > > > > > > more than a few seconds to complete this task.
> > > > > > > >
> > > > > > > > Next approach is to have a window store and a KV store.
> Window
> > > > store
> > > > > > will
> > > > > > > > have each user and corresponding TTL rounded to the nearest
> > > minute.
> > > > > > Then
> > > > > > > > find all keys between the current time and current time -
> 1min.
> > > > Then
> > > > > > > > iterate these keys and use the KV store to find if the TTL
> > value
> > > is
> > > > > > still
> > > > > > > > the same or if we have received any updates after that. If
> not
> > > then
> > > > > the
> > > > > > > > user will be evicted.
> > > > > > > >
> > > > > > > > What would be a better and much more scalable solution for
> > this.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to