Daan, Thanks for the replies, those make sense to me.
On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com> wrote: > I just updated the KIP to reflect the things discussed in this thread. > > As for your questions Guozhang: > > > 1) How do we handle if the num.partitions of app A's store changelog is > > different from the num.tasks of app B's sub-topology with that read-only > > store? Or are we going to let each task of B keep a whole copy of the > store > > of A by reading all of its changelog partitions, like global stores? > > Good question. Both need to be co-partitioned to have the data available. > Another option would be to use IQ to make the request, but that seems far > from ideal. > > > 2) Are we trying to synchronize the store updates from the changelog to > app > > B's processing timelines, or just like what we do for global stores that > we > > just update the read-only stores async? > > Pretty much the same as we do for global stores. > > > 3) If the answer to both of the above questions are the latter, then > what's > > the main difference of adding a read-only store v.s. adding a global > store? > > I think because of the first answer the behavior differs from global > stores. > > Makes sense? > > Cheers, > > D. > > From: Matthias J. Sax <mj...@apache.org> > Date: Thursday, 20 January 2022 at 21:12 > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [DISCUSS] KIP-813 Shared State Stores > > Any processor that would use that materialized, read-only statestore > would need to wait for the store to be restored. I can't find a way to make > that possible since processors can't wait for the statestore to be restored. > > This is built into the runtime already. Nothing to worry about. It's > part of the regular restore logic -- as long as any store is restoring, > all processing is blocked. > > > Also, since the statestore would have logging disabled, it means there > is no initial restoration going on. > > No. When we hookup the input topic as changelog (as the DSL does) we > restore from the input topic during regular restore phase. The restore > logic does not even know it's reading from the input topic, but not from > a "*-changelog" topic). > > Disabling changelogging does only affect the write path (ie, > `store.put()`) but not the restore path due to the internal "hookup" of > the input topic inside the restore logic. > > It's not easy to find/understand by reverse engineering I guess, but > it's there. > > One pointer where the actual hookup happens (might help to dig into it > more if you want): > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356 > > > -Matthias > > > On 1/20/22 10:04 AM, Guozhang Wang wrote: > > Hello Daan, > > > > Thanks for writing the KIP. I just read through it and just my 2c here: > to > > me it seems that one of the goal would be to "externalize" the internal > > changelog topic of an application (say A) so that other consumers can > > directly read them --- though technically without any auth, anyone > knowing > > the topic name would be able to write to it too, conceptually we would > just > > assume that app A is the only writer of that topic --- The question I had > > is how much we want to externalize the topic. For example we can, > > orthogonally to this KIP, just allow users to pass in a customized topic > > name when constructing a state store, indicating the application A to use > > that as the changelog, and since that topic is created outside of A and > is > > publicly visible to anyone else on that cluster, anyone --- including any > > consumers, or streams apps. This is probably most flexible as for > sharing, > > but we are even less assured that if application A is the only writer to > > that external topic unless we have explicit auth for A on that topic. > > > > Aside of that, here are a few more detailed comments about the > > implementation design itself following your current proposal: > > > > 1) How do we handle if the num.partitions of app A's store changelog is > > different from the num.tasks of app B's sub-topology with that read-only > > store? Or are we going to let each task of B keep a whole copy of the > store > > of A by reading all of its changelog partitions, like global stores? > > 2) Are we trying to synchronize the store updates from the changelog to > app > > B's processing timelines, or just like what we do for global stores that > we > > just update the read-only stores async? > > 3) If the answer to both of the above questions are the latter, then > what's > > the main difference of adding a read-only store v.s. adding a global > store? > > > > Guozhang > > > > > > > > On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com> > > wrote: > > > >> Hey Matthias, > >> > >> Thank you for that feedback, certainly some things to think about. Let > me > >> add my thoughts: > >> > >> +1 on simplifying the motivation. Was aiming to add more context but I > >> think you're right, bringing it back to the essence makes more sense. > >> > >> I also follow the reasoning of not having leader and follower. Makes > sense > >> to view it from a single app point of view. > >> > >> As for the API method and its parameters, I wanted to stay close to the > >> API for adding a regular statestore, but I can perfectly find myself in > >> defining an addReadOnlyStateStore() method instead. > >> > >> I agree the processor approach would be the most flexible one, and > surely > >> it allows you to use a processor to base the statestore off an existing > >> topic. From what I understood from the codebase, there might be a > problem > >> when using that statestore. Any processor that would use that > materialized, > >> read-only statestore would need to wait for the store to be restored. I > >> can't find a way to make that possible since processors can't wait for > the > >> statestore to be restored. Also, since the statestore would have logging > >> disabled, it means there is no initial restoration going on. As you > wrote, > >> the DSL is already doing this, so I'm pretty sure I'm missing something, > >> just unable to find what exactly. > >> > >> I will rewrite the parts in the KIP to make processor-based the > preferred > >> choice, along with the changes to the motivation etc. Only thing to > figure > >> out is that restoring behavior to be sure processors of the readonly > >> statestore aren't working with stale data. > >> > >> D. > >> > >> -----Original Message----- > >> From: Matthias J. Sax <mj...@apache.org> > >> Sent: 19 January 2022 21:31 > >> To: dev@kafka.apache.org > >> Subject: Re: [DISCUSS] KIP-813 Shared State Stores > >> > >> Daan, > >> > >> thanks for the KIP. I personally find the motivation section a little > bit > >> confusing. If I understand the KIP correctly, you want to read a topic > into > >> a state store (ie, materialize it). This is already possible today. > >> > >> Of course, today a "second" changelog topic would be created. It seems > the > >> KIP aims to avoid the additional changelog topic, and to allow to re-use > >> the original input topic (this optimization is already available for the > >> DSL, but not for the PAPI). > >> > >> If my observation is correct, we can simplify the motivation accordingly > >> (the fact that you want to use this feature to share state across > different > >> applications more efficiently seems to be secondary and we could omit it > >> IMHO to keep the motivation focused). > >> > >> As a result, we also don't need to concept of "leader" and "follower". > >> In the end, Kafka Streams cannot reason/enforce any usage patterns > across > >> different apps, but we can only guarantee stuff within a single > application > >> (ie, don't create a changelog but reuse an input topic as changelog). It > >> would simplify the KIP if we remove these parts. > >> > >> > >> > >> For the API, I am wondering why you propose to pass in `processorNames`? > >> To me, it seems more reasonable to pass a `ProcessorSupplier` instead > >> (similar to what we do for `addGlobalStore`)? The provided `Processor` > must > >> implement a certain pattern, ie, take each input record an apply it > >> unmodified to the state store (ie, the Processor will be solely > responsible > >> to maintain the state store). We might also need to pass in other > argument > >> similar to `addGlobalStore` into this method). (More below.) > >> > >> If other processors need to read the state store, they can be connected > to > >> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid > >> approach to keep `processorName` would also be possible, but IMHO all > those > >> should only _read_ the state store (but not modify it), to keep a clear > >> conceptual separation. > >> > >> About the method name: wondering if we should use a different name to be > >> more explicit what the method does? Maybe `addReadOnlyStateStore`? > >> > >> > >> > >> Btw: please omit any code snippets and only put the newly added method > >> signature in the KIP. > >> > >> What I don't yet understand is the section "Allow state stores to > >> continue listening for changes from their changelog". Can you elaborate? > >> > >> About: > >> > >>> Since a changelog topic is created with the application id in it’s > name, > >> it would allow us to check in the follower if the changelog topic starts > >> with our application id. If it doesn’t, we are not allowed to send a > log. > >> > >> The DSL implements this differently, and just disabled the changelog for > >> the state store (ie, for the "follower"). We could do the same thing > >> (either enforcing that the provided `StoreBuilder` has changelogging > >> disabled, or by just ignoring it and disabled it hard coded). > >> > >> > >> Ie, overall I would prefer the "source-procssor appraoch" that you put > >> into rejected alternatives. Note that the problem you call out, namely > >> > >>> Problem with this approach is the lack of having restoring support > >> within the state store > >> > >> does not apply. A restore it absolutely possible and the DSL already > >> supports it. > >> > >> > >> Or is your concern with regard to performance? The "source-processor > >> approach" would have the disadvantage that input data is first > >> deserialized, fed into the Processor, and than serialized again when put > >> into the state store. Re-using the state restore code is a good idea > >> from a performance point of view, but it might require quite some > >> internal changes (your proposal to "not stop restoring" might not work > >> as it could trigger quite some undesired side-effects given the current > >> architecture of Kafka Streams). > >> > >> > >> -Matthias > >> > >> > >> > >> > >> On 1/16/22 11:52 PM, Daan Gertis wrote: > >>> Hey everyone, > >>> > >>> Just created a KIP on sharing statestore state across multiple > >> applications without duplicating the data on multiple changelog topics. > >> Have a look and tell me what you think or what to improve. This is my > first > >> one, so please be gentle 😉 > >>> > >>> https://cwiki.apache.org/confluence/x/q53kCw > >>> > >>> Cheers! > >>> D. > >> > > > > > -- -- Guozhang