Thanks for the great thoughts Matthias and Guozhang! If I'm not mistaken, Guozhang's suggestion is what my second alternative on the KIP is ("Have the added method on the Supplier interfaces only return store names, not builders"). I do think it would be a worthwhile usability improvement on its own, but to Matthias's point, it doesn't achieve the full goal of completing encapsulating a state store and it's processor - it encapsulates the name, but not the StateStoreBuilder.
I'm really intrigued by Matthias's idea that forgoes the default interface method I proposed. Having smaller, separate interfaces is a powerful idea and I think a cleaner API than what I proposed. The non-shared store use case is handled well here, and the shared store use case is possible, though maybe still not as graceful as we would like (having to add the StoreBuilderSupplier before the StoreNameSupplier seems maybe too subtle to me). We're all agreed that one of the big problems with the shared store use case is how to deal with adding the same store to the topology multiple times. Catching the "store already added" exception is risky. Here's a maybe radical idea: change `topology.addStateStore()` to be idempotent for adding a given state store name and `StoreBuilder`. In other words, `addStateStore` would not throw the "store already added" exception if the `StoreBuilder` being added for a given name has the same identity as the one that has already been added. Does this eliminate all the bugs we're worried about? Thinking about it for a few minutes, it seems to eliminate most at least (would a user really use the exact same StoreBuilder when they intend there to be two stores?). It might make the API slightly harder to use if a user isn't immediately aware of that subtlety, but a good error message should ease the pain, and it would happen immediately during development. And with regards to Matthias's comment about whether we need to deprecate existing varargs transform methods - I don't think we need to, but it might be nice for there only to be one way to do things, assuming whatever we come up with supports all existing use cases. I don't feel strongly about this, but if we don't deprecate, I do think it's important to add checks that prevent users from trying to do the same thing in two different ways, as we've discussed. Paul On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io> wrote: > Guozhang, > > >> Regarding the last option to catch "store exist already" exception and > >> fallback to connect stores, I'm a bit concerned it may be hiding actual > >> user bugs. > > I agree with this concern. From my original email: > > > The only disadvantage I see, might be > > potential bugs about sharing state if two different stores are named the > > same by mistake (this would not be detected). > > > For your new proposal: I am not sure if it addresses Paul's original > idea -- I hope Paul can clarify. From my understanding, the idea was to > encapsulate a store and its processor. As many stores are not shared, > this seems to be quite useful. Your proposal falls a little short to > support encapsulation for none-shared stores. > > > -Matthias > > > > On 12/15/18 1:40 AM, Guozhang Wang wrote: > > Matthias, > > > > Thanks for your feedbacks. > > > > Regarding the last option to catch "store exist already" exception and > > fallback to connect stores, I'm a bit concerned it may be hiding actual > > user bugs. > > > > Thinking about Paul's proposal and your suggestion again, I'd like to > > propose another alternative somewhere in the middle of your approaches, > > i.e. we still let users to create sharable state stores via > > `addStateStore`, and we allow the TransformerSupplier to return a list of > > state stores that it needs, i.e.: > > > > public interface TransformerSupplier<K, V, R> { > > Transformer<K, V, R> get(); > > default List<String> stateStoreNames() { > > return Collections.emptyList(); > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > > } > > } > > > > by doing this users can still "consolidate" the references of store names > > in a single place in the transform call, e.g.: > > > > public class MyTransformerSupplier<K, V, R> { > > private String storeName; > > > > public class MyTransformer<K, V, R> { > > > > .... > > > > init() { > > store = context.getStateStore(storeName); > > } > > } > > > > default List<String> stateStoreNames() { > > return Collections.singletonList(storeName); > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > > } > > } > > > > Basically, we move the parameters from the caller of `transform` to > inside > > the TransformSuppliers. DSL implementations would not change much, simply > > calling `connectStateStore` by getting the list of names from the > provided > > function. > > > > Guozhang > > > > > > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Just a meta comment: do we really need to deprecate existing > >> `transform()` etc methods? > >> > >> The last argument is a vararg, and thus, just keeping the existing API > >> for this part seems to work too, allowing to implement both patterns? > >> > >> Also, instead of adding a default method, we could also add a new > >> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > >> stateStores()` -- users could implement `TransformerSupplier` and > >> `StoreBuilderSupplier` at once; and for this case, we require that users > >> don't provide store name in `transform()`. > >> > >> Similar, we could add an interface `StoreNameSupplier` with method > >> `List<String> stateStores()`. This allows to "auto-wire" a transformer > >> to existing stores (to avoid the issue to add the same store multiple > >> times). > >> > >> Hence, for shared stores, there would be one "main" transformer that > >> implements `StoreBuilderSupplier` and that must be added first to the > >> topology. The other transformers would implement `StoreNameSupplier` and > >> just connect to those stores. > >> > >> Another possibility to avoid the issue of adding the same stores > >> multiple times would be, that the DSL always calls `addStateStore()` but > >> catches a potential "store exists already" exception and falls back to > >> `connectProcessorAndStateStore()` for this case. Thus, we would not need > >> the `StoreNameSupplier` interface and the order in which transformers > >> are added would not matter either. The only disadvantage I see, might be > >> potential bugs about sharing state if two different stores are named the > >> same by mistake (this would not be detected). > >> > >> > >> > >> Just some ideas I wanted to share. What do you think? > >> > >> > >> > >> -Matthias > >> > >> On 12/11/18 3:46 AM, Paul Whalen wrote: > >>> Ah yes of course, this was an oversight, I completely ignored the > >> multiple > >>> processors sharing the same state store when writing up the KIP. Which > >> is > >>> funny, because I've actually done this (different processors sharing > >> state > >>> stores) a fair amount myself, and I've settled on a pattern where I > group > >>> the Processors in an enclosing class, and that enclosing class handles > as > >>> much as possible. Here's a gist showing the rough structure, just for > >>> context: > >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > >>> . Note how it adds the stores to the topology, as well as providing a > >>> public method with the store names. > >>> > >>> I don't think my proposal completely conflicts with the multiple > >> processors > >>> sharing state stores use case, since you can create a supplier that > >>> provides the store name you want, somewhat independently of your actual > >>> Processor logic. The issue I do see though, is that > >>> topology.addStateStore() can only be called once for a given store. So > >> for > >>> your example, if the there was a single TransformerSupplier that was > >> passed > >>> into both transform() calls, "store1" would be added (under the hood) > to > >>> the topology twice, which is no good. > >>> > >>> Perhaps this suggests that one of my alternatives on the KIP might be > >>> desirable: either not having the suppliers return StoreBuilders (just > >> store > >>> names), or not deprecating the old methods that take "String... > >>> stateStoreNames". I'll have to think about it a bit. > >>> > >>> Paul > >>> > >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Hello Paul, > >>>> > >>>> Thanks for the great writeup (very detailed and crystal motivation > >>>> sections!). > >>>> > >>>> This is quite an interesting idea and I do like the API cleanness you > >>>> proposed. The original motivation of letting StreamsTopology to add > >> state > >>>> stores though, is to allow different processors to share the state > >> store. > >>>> For example: > >>>> > >>>> builder.addStore("store1"); > >>>> > >>>> // a path of stream transformations that leads to KStream stream1. > >>>> stream1.transform(..., "store1"); > >>>> > >>>> // another path that generates a KStream stream2. > >>>> stream2.transform(..., "store1"); > >>>> > >>>> Behind the scene, Streams will make sure stream1 / stream2 > >> transformations > >>>> will always be grouped together as a single group of tasks, each of > >> which > >>>> will be executed by a single thread and hence there's no concurrency > >> issues > >>>> on accessing the store from different operators within the same task. > >> I'm > >>>> not sure how common this use case is, but I'd like to hear if you have > >> any > >>>> thoughts maintaining this since the current proposal seems exclude > this > >>>> possibility. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> > wrote: > >>>> > >>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change that > I > >>>>> think could greatly increase the usability of the low-level processor > >>>> API. > >>>>> I have some code written but will wait to see if there is buy in > before > >>>>> going all out and creating a pull request. It seems like most of the > >>>> work > >>>>> would be in updating documentation and tests. > >>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > >>>>> > >>>>> Thanks! > >>>>> Paul > >>>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > > > >