Daan,

Thanks for the replies, those make sense to me.

On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com> wrote:

> I just updated the KIP to reflect the things discussed in this thread.
>
> As for your questions Guozhang:
>
> > 1) How do we handle if the num.partitions of app A's store changelog is
> > different from the num.tasks of app B's sub-topology with that read-only
> > store? Or are we going to let each task of B keep a whole copy of the
> store
> > of A by reading all of its changelog partitions, like global stores?
>
> Good question. Both need to be co-partitioned to have the data available.
> Another option would be to use IQ to make the request, but that seems far
> from ideal.
>
> > 2) Are we trying to synchronize the store updates from the changelog to
> app
> > B's processing timelines, or just like what we do for global stores that
> we
> > just update the read-only stores async?
>
> Pretty much the same as we do for global stores.
>
> > 3) If the answer to both of the above questions are the latter, then
> what's
> > the main difference of adding a read-only store v.s. adding a global
> store?
>
> I think because of the first answer the behavior differs from global
> stores.
>
> Makes sense?
>
> Cheers,
>
> D.
>
> From: Matthias J. Sax <mj...@apache.org>
> Date: Thursday, 20 January 2022 at 21:12
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> > Any processor that would use that materialized, read-only statestore
> would need to wait for the store to be restored. I can't find a way to make
> that possible since processors can't wait for the statestore to be restored.
>
> This is built into the runtime already. Nothing to worry about. It's
> part of the regular restore logic -- as long as any store is restoring,
> all processing is blocked.
>
> > Also, since the statestore would have logging disabled, it means there
> is no initial restoration going on.
>
> No. When we hookup the input topic as changelog (as the DSL does) we
> restore from the input topic during regular restore phase. The restore
> logic does not even know it's reading from the input topic, but not from
> a "*-changelog" topic).
>
> Disabling changelogging does only affect the write path (ie,
> `store.put()`) but not the restore path due to the internal "hookup" of
> the input topic inside the restore logic.
>
> It's not easy to find/understand by reverse engineering I guess, but
> it's there.
>
> One pointer where the actual hookup happens (might help to dig into it
> more if you want):
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>
>
> -Matthias
>
>
> On 1/20/22 10:04 AM, Guozhang Wang wrote:
> > Hello Daan,
> >
> > Thanks for writing the KIP. I just read through it and just my 2c here:
> to
> > me it seems that one of the goal would be to "externalize" the internal
> > changelog topic of an application (say A) so that other consumers can
> > directly read them --- though technically without any auth, anyone
> knowing
> > the topic name would be able to write to it too, conceptually we would
> just
> > assume that app A is the only writer of that topic --- The question I had
> > is how much we want to externalize the topic. For example we can,
> > orthogonally to this KIP, just allow users to pass in a customized topic
> > name when constructing a state store, indicating the application A to use
> > that as the changelog, and since that topic is created outside of A and
> is
> > publicly visible to anyone else on that cluster, anyone --- including any
> > consumers, or streams apps. This is probably most flexible as for
> sharing,
> > but we are even less assured that if application A is the only writer to
> > that external topic unless we have explicit auth for A on that topic.
> >
> > Aside of that, here are a few more detailed comments about the
> > implementation design itself following your current proposal:
> >
> > 1) How do we handle if the num.partitions of app A's store changelog is
> > different from the num.tasks of app B's sub-topology with that read-only
> > store? Or are we going to let each task of B keep a whole copy of the
> store
> > of A by reading all of its changelog partitions, like global stores?
> > 2) Are we trying to synchronize the store updates from the changelog to
> app
> > B's processing timelines, or just like what we do for global stores that
> we
> > just update the read-only stores async?
> > 3) If the answer to both of the above questions are the latter, then
> what's
> > the main difference of adding a read-only store v.s. adding a global
> store?
> >
> > Guozhang
> >
> >
> >
> > On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com>
> > wrote:
> >
> >> Hey Matthias,
> >>
> >> Thank you for that feedback, certainly some things to think about. Let
> me
> >> add my thoughts:
> >>
> >> +1 on simplifying the motivation. Was aiming to add more context but I
> >> think you're right, bringing it back to the essence makes more sense.
> >>
> >> I also follow the reasoning of not having leader and follower. Makes
> sense
> >> to view it from a single app point of view.
> >>
> >> As for the API method and its parameters, I wanted to stay close to the
> >> API for adding a regular statestore, but I can perfectly find myself in
> >> defining an addReadOnlyStateStore() method instead.
> >>
> >> I agree the processor approach would be the most flexible one, and
> surely
> >> it allows you to use a processor to base the statestore off an existing
> >> topic. From what I understood from the codebase, there might be a
> problem
> >> when using that statestore. Any processor that would use that
> materialized,
> >> read-only statestore would need to wait for the store to be restored. I
> >> can't find a way to make that possible since processors can't wait for
> the
> >> statestore to be restored. Also, since the statestore would have logging
> >> disabled, it means there is no initial restoration going on. As you
> wrote,
> >> the DSL is already doing this, so I'm pretty sure I'm missing something,
> >> just unable to find what exactly.
> >>
> >> I will rewrite the parts in the KIP to make processor-based the
> preferred
> >> choice, along with the changes to the motivation etc. Only thing to
> figure
> >> out is that restoring behavior to be sure processors of the readonly
> >> statestore aren't working with stale data.
> >>
> >> D.
> >>
> >> -----Original Message-----
> >> From: Matthias J. Sax <mj...@apache.org>
> >> Sent: 19 January 2022 21:31
> >> To: dev@kafka.apache.org
> >> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> >>
> >> Daan,
> >>
> >> thanks for the KIP. I personally find the motivation section a little
> bit
> >> confusing. If I understand the KIP correctly, you want to read a topic
> into
> >> a state store (ie, materialize it). This is already possible today.
> >>
> >> Of course, today a "second" changelog topic would be created. It seems
> the
> >> KIP aims to avoid the additional changelog topic, and to allow to re-use
> >> the original input topic (this optimization is already available for the
> >> DSL, but not for the PAPI).
> >>
> >> If my observation is correct, we can simplify the motivation accordingly
> >> (the fact that you want to use this feature to share state across
> different
> >> applications more efficiently seems to be secondary and we could omit it
> >> IMHO to keep the motivation focused).
> >>
> >> As a result, we also don't need to concept of "leader" and "follower".
> >> In the end, Kafka Streams cannot reason/enforce any usage patterns
> across
> >> different apps, but we can only guarantee stuff within a single
> application
> >> (ie, don't create a changelog but reuse an input topic as changelog). It
> >> would simplify the KIP if we remove these parts.
> >>
> >>
> >>
> >> For the API, I am wondering why you propose to pass in `processorNames`?
> >> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
> >> (similar to what we do for `addGlobalStore`)? The provided `Processor`
> must
> >> implement a certain pattern, ie, take each input record an apply it
> >> unmodified to the state store (ie, the Processor will be solely
> responsible
> >> to maintain the state store). We might also need to pass in other
> argument
> >> similar to `addGlobalStore` into this method). (More below.)
> >>
> >> If other processors need to read the state store, they can be connected
> to
> >> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
> >> approach to keep `processorName` would also be possible, but IMHO all
> those
> >> should only _read_ the state store (but not modify it), to keep a clear
> >> conceptual separation.
> >>
> >> About the method name: wondering if we should use a different name to be
> >> more explicit what the method does? Maybe `addReadOnlyStateStore`?
> >>
> >>
> >>
> >> Btw: please omit any code snippets and only put the newly added method
> >> signature in the KIP.
> >>
> >> What I don't yet understand is the section "Allow state stores to
> >> continue listening for changes from their changelog". Can you elaborate?
> >>
> >> About:
> >>
> >>> Since a changelog topic is created with the application id in it’s
> name,
> >> it would allow us to check in the follower if the changelog topic starts
> >> with our application id. If it doesn’t, we are not allowed to send a
> log.
> >>
> >> The DSL implements this differently, and just disabled the changelog for
> >> the state store (ie, for the "follower"). We could do the same thing
> >> (either enforcing that the provided `StoreBuilder` has changelogging
> >> disabled, or by just ignoring it and disabled it hard coded).
> >>
> >>
> >> Ie, overall I would prefer the "source-procssor appraoch" that you put
> >> into rejected alternatives. Note that the problem you call out, namely
> >>
> >>> Problem with this approach is the lack of having restoring support
> >> within the state store
> >>
> >> does not apply. A restore it absolutely possible and the DSL already
> >> supports it.
> >>
> >>
> >> Or is your concern with regard to performance? The "source-processor
> >> approach" would have the disadvantage that input data is first
> >> deserialized, fed into the Processor, and than serialized again when put
> >> into the state store. Re-using the state restore code is a good idea
> >> from a performance point of view, but it might require quite some
> >> internal changes (your proposal to "not stop restoring" might not work
> >> as it could trigger quite some undesired side-effects given the current
> >> architecture of Kafka Streams).
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >> On 1/16/22 11:52 PM, Daan Gertis wrote:
> >>> Hey everyone,
> >>>
> >>> Just created a KIP on sharing statestore state across multiple
> >> applications without duplicating the data on multiple changelog topics.
> >> Have a look and tell me what you think or what to improve. This is my
> first
> >> one, so please be gentle 😉
> >>>
> >>> https://cwiki.apache.org/confluence/x/q53kCw
> >>>
> >>> Cheers!
> >>> D.
> >>
> >
> >
>


-- 
-- Guozhang

Reply via email to