Hello Paul, Thanks for the uploaded PR and the detailed description! I've made a pass on it and left some comments.
Overall I think I agree with you that passing in the storebuilder directly that store name is more convienent as it does not require another `addStore` call, but we just need to spend some more documentation effort on educating users about the two ways of connecting their stores. I'm slightly concerned about this education curve but I can be convinced if most people felt it is worthy. Guozhang On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> wrote: > I'd like to resurrect this discussion with a cursory, proof-of-concept > implementation of the KIP which combines many of our ideas: > https://github.com/apache/kafka/pull/6496. I tried to keep the diff as > small as possible for now, just using it to convey the main ideas. But > I'll separately address some of our earlier discussion: > > - Will there be a new, separate interface for users to implement for the > new functionality? No, to hopefully keep things simple, all of the > Processor/TransformerSupplier interfaces will just extend > StateStoresSupplier, allowing users to opt in to this functionality by > overriding the default implementation that gives an empty list. > - Will the interface allow users to specify the store name, or the > entire StoreBuilder? The entire StoreBuilder, so the > Processor/TransformerSupplier can completely encapsulate name and > implementation of a state store if desired. > - Will the old way of specifying store names alongside the supplier when > calling stream.process/transform() be deprecated? No, this is still a > legitimate way to wire up Processors/Transformers and their stores. But > I > would recommend not allowing stream.process/transform() calls that use > both > store declaration mechanisms (this restriction is not in the proof of > concept) > - How will we handle adding the same state store to the topology > multiple times because different Processor/TransformerSuppliers declare > it? > topology.addStateStore() will be slightly relaxed for convenience, and > will > allow adding the same StoreBuilder multiple times as long as the exact > same > StoreBuilder instance is being added for the same store name. This > seems > to prevent in practice the issue of accidentally making two state stores > one by adding with the same name. For additional safety, if we wanted > to > (not in the proof of concept), we could allow for this relaxation only > for > internal callers of topology.addStateStore(). > > So, in summary, the use cases look like: > > - 1 transformer/processor that owns its store: Using the new > StateStoresSupplier interface method to supply its StoreBuilders that > will > be added to the topology automatically. > - Multiple transformer/processors that share the same store: Either > > > 1. The old way: the StoreBuilder is defined "far away" from the > Transformer/Processor implementations, and is added to the topology > manually by the user > 2. The new way: the StoreBuilder is defined closer to the > Transformer/Processor implementations, and the same instance is > returned by > all Transformer/ProcessorSuppliers that need it > > > This makes the KIP wiki a bit stale; I'll update if we want to bring this > design to a vote. > > Thanks! > Paul > > On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Matthias / Paul, > > > > The concern I had about introducing `StoreBuilderSupplier` is simply > > because it is another XXSupplier to the public API, so I'd like to ask if > > we really have to add it :) > > > > The difference between encapsulating the store name and encapsulating the > > full state store builder is that, in the former: > > > > ----------- > > > > String storeName = "store1"; > > builder.addStore(new MyStoreBuilder(storeName)); > > stream1.transform(new MyTransformerSupplier(storeName)); // following > my > > proposal, that the store name can be passed in and used for both > > `listStores` and in the `Transformer#init`; so the Transformer function > > does not need to get the constant string name again. > > > > // one caveat to admit, is that > > MyTransofmerSupplier logic may be just unique to `store1` so it cannot be > > reused with a different store name anyways. > > ----------- > > > > While in the latter: > > > > ----------- > > > > stream1.transform(new MyTransformerSupplierForStore1); // the name is > > just indicating that we may have one such supplier for each store. > > > > ----------- > > > > I understand the latter introduce more convenience from the API, but the > > cost is that since we still cannot completely `builder.addStore`, but > only > > reduce its semantic scope to shared state stores only,; hence users need > to > > learn two ways of creating state stores for those two patterns. > > > > My argument is that more public APIs requires longer learning curve for > > users, and introduces more usage patterns that may confuse users (the > > proposal I had tries to replace one with another completely). > > > > > > Guozhang > > > > On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> wrote: > > > > > Thanks for the great thoughts Matthias and Guozhang! > > > > > > If I'm not mistaken, Guozhang's suggestion is what my second > alternative > > on > > > the KIP is ("Have the added method on the Supplier interfaces only > return > > > store names, not builders"). I do think it would be a worthwhile > > usability > > > improvement on its own, but to Matthias's point, it doesn't achieve the > > > full goal of completing encapsulating a state store and it's processor > - > > it > > > encapsulates the name, but not the StateStoreBuilder. > > > > > > I'm really intrigued by Matthias's idea that forgoes the default > > interface > > > method I proposed. Having smaller, separate interfaces is a powerful > > idea > > > and I think a cleaner API than what I proposed. The non-shared store > use > > > case is handled well here, and the shared store use case is possible, > > > though maybe still not as graceful as we would like (having to add the > > > StoreBuilderSupplier before the StoreNameSupplier seems maybe too > subtle > > to > > > me). > > > > > > We're all agreed that one of the big problems with the shared store use > > > case is how to deal with adding the same store to the topology multiple > > > times. Catching the "store already added" exception is risky. Here's > a > > > maybe radical idea: change `topology.addStateStore()` to be idempotent > > for > > > adding a given state store name and `StoreBuilder`. In other words, > > > `addStateStore` would not throw the "store already added" exception if > > the > > > `StoreBuilder` being added for a given name has the same identity as > the > > > one that has already been added. Does this eliminate all the bugs > we're > > > worried about? Thinking about it for a few minutes, it seems to > > eliminate > > > most at least (would a user really use the exact same StoreBuilder when > > > they intend there to be two stores?). It might make the API slightly > > > harder to use if a user isn't immediately aware of that subtlety, but a > > > good error message should ease the pain, and it would happen > immediately > > > during development. > > > > > > And with regards to Matthias's comment about whether we need to > deprecate > > > existing varargs transform methods - I don't think we need to, but it > > might > > > be nice for there only to be one way to do things, assuming whatever we > > > come up with supports all existing use cases. I don't feel strongly > > about > > > this, but if we don't deprecate, I do think it's important to add > checks > > > that prevent users from trying to do the same thing in two different > > ways, > > > as we've discussed. > > > > > > Paul > > > > > > On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > > > Guozhang, > > > > > > > > >> Regarding the last option to catch "store exist already" exception > > and > > > > >> fallback to connect stores, I'm a bit concerned it may be hiding > > > actual > > > > >> user bugs. > > > > > > > > I agree with this concern. From my original email: > > > > > > > > > The only disadvantage I see, might be > > > > > potential bugs about sharing state if two different stores are > named > > > the > > > > > same by mistake (this would not be detected). > > > > > > > > > > > > For your new proposal: I am not sure if it addresses Paul's original > > > > idea -- I hope Paul can clarify. From my understanding, the idea was > to > > > > encapsulate a store and its processor. As many stores are not shared, > > > > this seems to be quite useful. Your proposal falls a little short to > > > > support encapsulation for none-shared stores. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 12/15/18 1:40 AM, Guozhang Wang wrote: > > > > > Matthias, > > > > > > > > > > Thanks for your feedbacks. > > > > > > > > > > Regarding the last option to catch "store exist already" exception > > and > > > > > fallback to connect stores, I'm a bit concerned it may be hiding > > actual > > > > > user bugs. > > > > > > > > > > Thinking about Paul's proposal and your suggestion again, I'd like > to > > > > > propose another alternative somewhere in the middle of your > > approaches, > > > > > i.e. we still let users to create sharable state stores via > > > > > `addStateStore`, and we allow the TransformerSupplier to return a > > list > > > of > > > > > state stores that it needs, i.e.: > > > > > > > > > > public interface TransformerSupplier<K, V, R> { > > > > > Transformer<K, V, R> get(); > > > > > default List<String> stateStoreNames() { > > > > > return Collections.emptyList(); > > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > ;> > > > > > } > > > > > } > > > > > > > > > > by doing this users can still "consolidate" the references of store > > > names > > > > > in a single place in the transform call, e.g.: > > > > > > > > > > public class MyTransformerSupplier<K, V, R> { > > > > > private String storeName; > > > > > > > > > > public class MyTransformer<K, V, R> { > > > > > > > > > > .... > > > > > > > > > > init() { > > > > > store = context.getStateStore(storeName); > > > > > } > > > > > } > > > > > > > > > > default List<String> stateStoreNames() { > > > > > return Collections.singletonList(storeName); > > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > ;> > > > > > } > > > > > } > > > > > > > > > > Basically, we move the parameters from the caller of `transform` to > > > > inside > > > > > the TransformSuppliers. DSL implementations would not change much, > > > simply > > > > > calling `connectStateStore` by getting the list of names from the > > > > provided > > > > > function. > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < > > matth...@confluent.io > > > > > > > > > wrote: > > > > > > > > > >> Just a meta comment: do we really need to deprecate existing > > > > >> `transform()` etc methods? > > > > >> > > > > >> The last argument is a vararg, and thus, just keeping the existing > > API > > > > >> for this part seems to work too, allowing to implement both > > patterns? > > > > >> > > > > >> Also, instead of adding a default method, we could also add a new > > > > >> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > > > > >> stateStores()` -- users could implement `TransformerSupplier` and > > > > >> `StoreBuilderSupplier` at once; and for this case, we require that > > > users > > > > >> don't provide store name in `transform()`. > > > > >> > > > > >> Similar, we could add an interface `StoreNameSupplier` with method > > > > >> `List<String> stateStores()`. This allows to "auto-wire" a > > transformer > > > > >> to existing stores (to avoid the issue to add the same store > > multiple > > > > >> times). > > > > >> > > > > >> Hence, for shared stores, there would be one "main" transformer > that > > > > >> implements `StoreBuilderSupplier` and that must be added first to > > the > > > > >> topology. The other transformers would implement > `StoreNameSupplier` > > > and > > > > >> just connect to those stores. > > > > >> > > > > >> Another possibility to avoid the issue of adding the same stores > > > > >> multiple times would be, that the DSL always calls > `addStateStore()` > > > but > > > > >> catches a potential "store exists already" exception and falls > back > > to > > > > >> `connectProcessorAndStateStore()` for this case. Thus, we would > not > > > need > > > > >> the `StoreNameSupplier` interface and the order in which > > transformers > > > > >> are added would not matter either. The only disadvantage I see, > > might > > > be > > > > >> potential bugs about sharing state if two different stores are > named > > > the > > > > >> same by mistake (this would not be detected). > > > > >> > > > > >> > > > > >> > > > > >> Just some ideas I wanted to share. What do you think? > > > > >> > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> On 12/11/18 3:46 AM, Paul Whalen wrote: > > > > >>> Ah yes of course, this was an oversight, I completely ignored the > > > > >> multiple > > > > >>> processors sharing the same state store when writing up the KIP. > > > Which > > > > >> is > > > > >>> funny, because I've actually done this (different processors > > sharing > > > > >> state > > > > >>> stores) a fair amount myself, and I've settled on a pattern > where I > > > > group > > > > >>> the Processors in an enclosing class, and that enclosing class > > > handles > > > > as > > > > >>> much as possible. Here's a gist showing the rough structure, > just > > > for > > > > >>> context: > > > > >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > > > > >>> . Note how it adds the stores to the topology, as well as > > providing a > > > > >>> public method with the store names. > > > > >>> > > > > >>> I don't think my proposal completely conflicts with the multiple > > > > >> processors > > > > >>> sharing state stores use case, since you can create a supplier > that > > > > >>> provides the store name you want, somewhat independently of your > > > actual > > > > >>> Processor logic. The issue I do see though, is that > > > > >>> topology.addStateStore() can only be called once for a given > store. > > > So > > > > >> for > > > > >>> your example, if the there was a single TransformerSupplier that > > was > > > > >> passed > > > > >>> into both transform() calls, "store1" would be added (under the > > hood) > > > > to > > > > >>> the topology twice, which is no good. > > > > >>> > > > > >>> Perhaps this suggests that one of my alternatives on the KIP > might > > be > > > > >>> desirable: either not having the suppliers return StoreBuilders > > (just > > > > >> store > > > > >>> names), or not deprecating the old methods that take "String... > > > > >>> stateStoreNames". I'll have to think about it a bit. > > > > >>> > > > > >>> Paul > > > > >>> > > > > >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < > wangg...@gmail.com> > > > > >> wrote: > > > > >>> > > > > >>>> Hello Paul, > > > > >>>> > > > > >>>> Thanks for the great writeup (very detailed and crystal > motivation > > > > >>>> sections!). > > > > >>>> > > > > >>>> This is quite an interesting idea and I do like the API > cleanness > > > you > > > > >>>> proposed. The original motivation of letting StreamsTopology to > > add > > > > >> state > > > > >>>> stores though, is to allow different processors to share the > state > > > > >> store. > > > > >>>> For example: > > > > >>>> > > > > >>>> builder.addStore("store1"); > > > > >>>> > > > > >>>> // a path of stream transformations that leads to KStream > stream1. > > > > >>>> stream1.transform(..., "store1"); > > > > >>>> > > > > >>>> // another path that generates a KStream stream2. > > > > >>>> stream2.transform(..., "store1"); > > > > >>>> > > > > >>>> Behind the scene, Streams will make sure stream1 / stream2 > > > > >> transformations > > > > >>>> will always be grouped together as a single group of tasks, each > > of > > > > >> which > > > > >>>> will be executed by a single thread and hence there's no > > concurrency > > > > >> issues > > > > >>>> on accessing the store from different operators within the same > > > task. > > > > >> I'm > > > > >>>> not sure how common this use case is, but I'd like to hear if > you > > > have > > > > >> any > > > > >>>> thoughts maintaining this since the current proposal seems > exclude > > > > this > > > > >>>> possibility. > > > > >>>> > > > > >>>> > > > > >>>> Guozhang > > > > >>>> > > > > >>>> > > > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> > > > > wrote: > > > > >>>> > > > > >>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change > > > that > > > > I > > > > >>>>> think could greatly increase the usability of the low-level > > > processor > > > > >>>> API. > > > > >>>>> I have some code written but will wait to see if there is buy > in > > > > before > > > > >>>>> going all out and creating a pull request. It seems like most > of > > > the > > > > >>>> work > > > > >>>>> would be in updating documentation and tests. > > > > >>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > > > > >>>>> > > > > >>>>> Thanks! > > > > >>>>> Paul > > > > >>>>> > > > > >>>> > > > > >>>> > > > > >>>> -- > > > > >>>> -- Guozhang > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang