Thanks a lot Guozhang. I will try and let you know.

Really appreciate all the help. This community has been amazing.

Thanks

On Tue, Feb 23, 2021 at 5:48 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Sorry I was not very clear before: by "WindowStore" I meant implementing
> your own customized store based on a kvStore where the key is a combo
> <timestamp, key>. Note you put timestamp first then key in your
> serialization format, so that you can range-fetch with just the prefix on
> timestamp then. In fact `WindowStore` that we provide is also following
> this design principle, but it's combo key is in <key, timestamp> so range
> fetch is not as efficient since you'd need to fetch a much larger range and
> then filter a lot of records.
>
>
> Guozhang
>
> On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan <
> reachnavnee...@gmail.com>
> wrote:
>
> > Thanks Guozhang.
> >
> > I don't see the remove method in window stores. Am I missing something?
> It
> > would be very nice to implement the optimization you had mentioned.
> >
> > Thanks
> >
> > On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > I see. In that case I think your design with a KVstore plus a
> > book-keeping
> > > window store would work better. One minor optimization you can try
> > though,
> > > is that instead of trying to check if the TTL has changed or not when
> > > expiring from the window store, you can try to delete from the window
> > store
> > > whenever you are updating the kv-store. More specifically, when you
> > update
> > > the kv-store, do sth. like this:
> > >
> > > value = kvStore.get(k);  // here value also encodes the timestamp, e.g.
> > see
> > > "TimestampedKeyValueStore" interface
> > > if (value != v)
> > >   // v is the new value you want to put
> > >   windowStore.remove(combo-key); // here the combo-key is a <timestamp,
> > > key> where timestamp is extracted from value
> > >
> > > kvStore.put(k, v)
> > > kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>
> > >
> > > Later when you expire, you do not need to check on kvStore if the
> value's
> > > timestamp has changed or not.
> > >
> > >
> > >
> > >
> > > On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Liam & Guozhang.
> > > >
> > > > First of all, we use PAPI in our entire topology and we would like to
> > > > retain it that way rather than combining with DSL. Secondly, even I
> was
> > > > more leaning towards session store but the problem I found with
> session
> > > > store is I cannot get all the expired sessions without keys where as
> > > > windowstore has the option to get all keys by range. Ideally I would
> > like
> > > > to have a punctuate function which finds all the expired records and
> > send
> > > > it to downstream. I looked at KStreamSessionWindowAggregate but it
> > looks
> > > > like we need a new value coming in for the key to even send updates.
> In
> > > my
> > > > case there might not be any activity at all but I still need to send
> > the
> > > > delete event.
> > > >
> > > > Here is how we want it to work
> > > > T -> User1 (Active event)
> > > > T+5 -> User1 (Active event)
> > > > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> > > > period)
> > > >
> > > > Thanks
> > > >
> > > > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Navneeth,
> > > > >
> > > > > I would agree with Liam that a session store seems a good fit for
> > your
> > > > > case. But note that session stores would not expire a session
> > > themselves
> > > > > and it is still the processor node's job to find those already
> > expired
> > > > > sessions and emit results / delete. You can take a look at
> > > > > the KStreamSessionWindowAggregate inside Kafka code base (
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > > > > )
> > > > > for a reference.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > > > > liam.cla...@adscale.co.nz> wrote:
> > > > >
> > > > > > Hmmm, thanks Navneeth,
> > > > > >
> > > > > > I feel like a session store set to an inactivity period of 10
> > > minutes,
> > > > > > suppressed until session window closed, combined with a
> > GlobalKTable
> > > > > would
> > > > > > be how I'd start to approach this in the DSL, with the below
> > > topology.
> > > > I
> > > > > > have no idea if my ASCII art below will survive email formatting,
> > so
> > > > I'll
> > > > > > try to explain. User ids stream into the GlobalKTable, and also
> > into
> > > > the
> > > > > > session store. After 10 minutes of inactivity for a given user id
> > > key,
> > > > > the
> > > > > > session expires, and the session store emits the user_id ->
> > > some_value.
> > > > > I'd
> > > > > > then map the some_value to null, to take advantage of KTable
> > > semantics
> > > > > > where `k -> null` is treated as a delete for key k, so an
> inactive
> > > user
> > > > > > would be deleted from the ktable. You could then periodically
> query
> > > the
> > > > > > ktable's key-value store for outside emission.
> > > > > >
> > > > > > That said, this is only how I'd start to explore the problem, and
> > > there
> > > > > are
> > > > > > obvious questions that need to be answered first like how much
> > state
> > > > > would
> > > > > > you end up storing in the session store, etc. I'm hoping someone
> > like
> > > > > John
> > > > > > Roesler who has far better insights into Kafka Streams might
> weigh
> > in
> > > > > here.
> > > > > >
> > > > > >
> > > > > > user ids ------------------------------------------------------>
> > > > > > globalktable <---- keyValueStore periodically queried.
> > > > > >       \------------> session store ----> map (user_id -> null)
> --/
> > > > > >
> > > > > > Good luck,
> > > > > >
> > > > > > Liam
> > > > > >
> > > > > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > > > > reachnavnee...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Liam,
> > > > > > >
> > > > > > > The use case is stream all data and send it to storage after
> > > > > processing.
> > > > > > > Also when the user is inactive for a 10 min period then send a
> > > > special
> > > > > > > event that marks the user as inactive. I'm trying to implement
> > the
> > > > > > special
> > > > > > > event here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > > > > liam.cla...@adscale.co.nz> wrote:
> > > > > > >
> > > > > > > > Hey Navneeth,
> > > > > > > >
> > > > > > > > So to understand your problem better - do you only want to
> > stream
> > > > > users
> > > > > > > > active within 10 minutes to storage?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Liam
> > > > > > > >
> > > > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > > > > reachnavnee...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > It’s just for emitting to data storage. There is no join
> > here.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > > > > liam.cla...@adscale.co.nz> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Navneeth,
> > > > > > > > > >
> > > > > > > > > > What is the purpose of holding these user records? Is it
> to
> > > > join
> > > > > > > > against
> > > > > > > > > > other streams, or emit to data storage?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Liam Clarke-Hutchinson
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > > > > reachnavnee...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All,
> > > > > > > > > > >
> > > > > > > > > > > I have a question about how I can use window stores to
> > > > achieve
> > > > > > this
> > > > > > > > use
> > > > > > > > > > > case. Thanks for all the help.
> > > > > > > > > > >
> > > > > > > > > > > A user record will be created when the user first
> logins
> > > and
> > > > > the
> > > > > > > > > records
> > > > > > > > > > > needs to be cleaned up after 10 mins of inactivity.
> Thus
> > > for
> > > > > each
> > > > > > > > user
> > > > > > > > > > > there will be a TTL but the TTL value will be updated
> > each
> > > > time
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > > user is active before he becomes inactive for the
> entire
> > 10
> > > > min
> > > > > > > > period.
> > > > > > > > > > We
> > > > > > > > > > > are currently using PAPI for all our topologies and I
> was
> > > > > > thinking
> > > > > > > of
> > > > > > > > > > > implementing it using a punctuator.
> > > > > > > > > > >
> > > > > > > > > > > My initial logic was to have a KV store with each user
> as
> > > key
> > > > > and
> > > > > > > TTL
> > > > > > > > > as
> > > > > > > > > > > the value and run a scheduled task every minute that
> > looks
> > > at
> > > > > all
> > > > > > > the
> > > > > > > > > > > records which have TTL value lesser than the timestamp.
> > But
> > > > the
> > > > > > > > problem
> > > > > > > > > > in
> > > > > > > > > > > this approach was performance. When there are more than
> > 1M
> > > > > > records
> > > > > > > it
> > > > > > > > > > takes
> > > > > > > > > > > more than a few seconds to complete this task.
> > > > > > > > > > >
> > > > > > > > > > > Next approach is to have a window store and a KV store.
> > > > Window
> > > > > > > store
> > > > > > > > > will
> > > > > > > > > > > have each user and corresponding TTL rounded to the
> > nearest
> > > > > > minute.
> > > > > > > > > Then
> > > > > > > > > > > find all keys between the current time and current
> time -
> > > > 1min.
> > > > > > > Then
> > > > > > > > > > > iterate these keys and use the KV store to find if the
> > TTL
> > > > > value
> > > > > > is
> > > > > > > > > still
> > > > > > > > > > > the same or if we have received any updates after that.
> > If
> > > > not
> > > > > > then
> > > > > > > > the
> > > > > > > > > > > user will be evicted.
> > > > > > > > > > >
> > > > > > > > > > > What would be a better and much more scalable solution
> > for
> > > > > this.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to