I'd like to resurrect this discussion with a cursory, proof-of-concept implementation of the KIP which combines many of our ideas: https://github.com/apache/kafka/pull/6496. I tried to keep the diff as small as possible for now, just using it to convey the main ideas. But I'll separately address some of our earlier discussion:
- Will there be a new, separate interface for users to implement for the new functionality? No, to hopefully keep things simple, all of the Processor/TransformerSupplier interfaces will just extend StateStoresSupplier, allowing users to opt in to this functionality by overriding the default implementation that gives an empty list. - Will the interface allow users to specify the store name, or the entire StoreBuilder? The entire StoreBuilder, so the Processor/TransformerSupplier can completely encapsulate name and implementation of a state store if desired. - Will the old way of specifying store names alongside the supplier when calling stream.process/transform() be deprecated? No, this is still a legitimate way to wire up Processors/Transformers and their stores. But I would recommend not allowing stream.process/transform() calls that use both store declaration mechanisms (this restriction is not in the proof of concept) - How will we handle adding the same state store to the topology multiple times because different Processor/TransformerSuppliers declare it? topology.addStateStore() will be slightly relaxed for convenience, and will allow adding the same StoreBuilder multiple times as long as the exact same StoreBuilder instance is being added for the same store name. This seems to prevent in practice the issue of accidentally making two state stores one by adding with the same name. For additional safety, if we wanted to (not in the proof of concept), we could allow for this relaxation only for internal callers of topology.addStateStore(). So, in summary, the use cases look like: - 1 transformer/processor that owns its store: Using the new StateStoresSupplier interface method to supply its StoreBuilders that will be added to the topology automatically. - Multiple transformer/processors that share the same store: Either 1. The old way: the StoreBuilder is defined "far away" from the Transformer/Processor implementations, and is added to the topology manually by the user 2. The new way: the StoreBuilder is defined closer to the Transformer/Processor implementations, and the same instance is returned by all Transformer/ProcessorSuppliers that need it This makes the KIP wiki a bit stale; I'll update if we want to bring this design to a vote. Thanks! Paul On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> wrote: > Matthias / Paul, > > The concern I had about introducing `StoreBuilderSupplier` is simply > because it is another XXSupplier to the public API, so I'd like to ask if > we really have to add it :) > > The difference between encapsulating the store name and encapsulating the > full state store builder is that, in the former: > > ----------- > > String storeName = "store1"; > builder.addStore(new MyStoreBuilder(storeName)); > stream1.transform(new MyTransformerSupplier(storeName)); // following my > proposal, that the store name can be passed in and used for both > `listStores` and in the `Transformer#init`; so the Transformer function > does not need to get the constant string name again. > > // one caveat to admit, is that > MyTransofmerSupplier logic may be just unique to `store1` so it cannot be > reused with a different store name anyways. > ----------- > > While in the latter: > > ----------- > > stream1.transform(new MyTransformerSupplierForStore1); // the name is > just indicating that we may have one such supplier for each store. > > ----------- > > I understand the latter introduce more convenience from the API, but the > cost is that since we still cannot completely `builder.addStore`, but only > reduce its semantic scope to shared state stores only,; hence users need to > learn two ways of creating state stores for those two patterns. > > My argument is that more public APIs requires longer learning curve for > users, and introduces more usage patterns that may confuse users (the > proposal I had tries to replace one with another completely). > > > Guozhang > > On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> wrote: > > > Thanks for the great thoughts Matthias and Guozhang! > > > > If I'm not mistaken, Guozhang's suggestion is what my second alternative > on > > the KIP is ("Have the added method on the Supplier interfaces only return > > store names, not builders"). I do think it would be a worthwhile > usability > > improvement on its own, but to Matthias's point, it doesn't achieve the > > full goal of completing encapsulating a state store and it's processor - > it > > encapsulates the name, but not the StateStoreBuilder. > > > > I'm really intrigued by Matthias's idea that forgoes the default > interface > > method I proposed. Having smaller, separate interfaces is a powerful > idea > > and I think a cleaner API than what I proposed. The non-shared store use > > case is handled well here, and the shared store use case is possible, > > though maybe still not as graceful as we would like (having to add the > > StoreBuilderSupplier before the StoreNameSupplier seems maybe too subtle > to > > me). > > > > We're all agreed that one of the big problems with the shared store use > > case is how to deal with adding the same store to the topology multiple > > times. Catching the "store already added" exception is risky. Here's a > > maybe radical idea: change `topology.addStateStore()` to be idempotent > for > > adding a given state store name and `StoreBuilder`. In other words, > > `addStateStore` would not throw the "store already added" exception if > the > > `StoreBuilder` being added for a given name has the same identity as the > > one that has already been added. Does this eliminate all the bugs we're > > worried about? Thinking about it for a few minutes, it seems to > eliminate > > most at least (would a user really use the exact same StoreBuilder when > > they intend there to be two stores?). It might make the API slightly > > harder to use if a user isn't immediately aware of that subtlety, but a > > good error message should ease the pain, and it would happen immediately > > during development. > > > > And with regards to Matthias's comment about whether we need to deprecate > > existing varargs transform methods - I don't think we need to, but it > might > > be nice for there only to be one way to do things, assuming whatever we > > come up with supports all existing use cases. I don't feel strongly > about > > this, but if we don't deprecate, I do think it's important to add checks > > that prevent users from trying to do the same thing in two different > ways, > > as we've discussed. > > > > Paul > > > > On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Guozhang, > > > > > > >> Regarding the last option to catch "store exist already" exception > and > > > >> fallback to connect stores, I'm a bit concerned it may be hiding > > actual > > > >> user bugs. > > > > > > I agree with this concern. From my original email: > > > > > > > The only disadvantage I see, might be > > > > potential bugs about sharing state if two different stores are named > > the > > > > same by mistake (this would not be detected). > > > > > > > > > For your new proposal: I am not sure if it addresses Paul's original > > > idea -- I hope Paul can clarify. From my understanding, the idea was to > > > encapsulate a store and its processor. As many stores are not shared, > > > this seems to be quite useful. Your proposal falls a little short to > > > support encapsulation for none-shared stores. > > > > > > > > > -Matthias > > > > > > > > > > > > On 12/15/18 1:40 AM, Guozhang Wang wrote: > > > > Matthias, > > > > > > > > Thanks for your feedbacks. > > > > > > > > Regarding the last option to catch "store exist already" exception > and > > > > fallback to connect stores, I'm a bit concerned it may be hiding > actual > > > > user bugs. > > > > > > > > Thinking about Paul's proposal and your suggestion again, I'd like to > > > > propose another alternative somewhere in the middle of your > approaches, > > > > i.e. we still let users to create sharable state stores via > > > > `addStateStore`, and we allow the TransformerSupplier to return a > list > > of > > > > state stores that it needs, i.e.: > > > > > > > > public interface TransformerSupplier<K, V, R> { > > > > Transformer<K, V, R> get(); > > > > default List<String> stateStoreNames() { > > > > return Collections.emptyList(); > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > > > > } > > > > } > > > > > > > > by doing this users can still "consolidate" the references of store > > names > > > > in a single place in the transform call, e.g.: > > > > > > > > public class MyTransformerSupplier<K, V, R> { > > > > private String storeName; > > > > > > > > public class MyTransformer<K, V, R> { > > > > > > > > .... > > > > > > > > init() { > > > > store = context.getStateStore(storeName); > > > > } > > > > } > > > > > > > > default List<String> stateStoreNames() { > > > > return Collections.singletonList(storeName); > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > > > > } > > > > } > > > > > > > > Basically, we move the parameters from the caller of `transform` to > > > inside > > > > the TransformSuppliers. DSL implementations would not change much, > > simply > > > > calling `connectStateStore` by getting the list of names from the > > > provided > > > > function. > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> Just a meta comment: do we really need to deprecate existing > > > >> `transform()` etc methods? > > > >> > > > >> The last argument is a vararg, and thus, just keeping the existing > API > > > >> for this part seems to work too, allowing to implement both > patterns? > > > >> > > > >> Also, instead of adding a default method, we could also add a new > > > >> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > > > >> stateStores()` -- users could implement `TransformerSupplier` and > > > >> `StoreBuilderSupplier` at once; and for this case, we require that > > users > > > >> don't provide store name in `transform()`. > > > >> > > > >> Similar, we could add an interface `StoreNameSupplier` with method > > > >> `List<String> stateStores()`. This allows to "auto-wire" a > transformer > > > >> to existing stores (to avoid the issue to add the same store > multiple > > > >> times). > > > >> > > > >> Hence, for shared stores, there would be one "main" transformer that > > > >> implements `StoreBuilderSupplier` and that must be added first to > the > > > >> topology. The other transformers would implement `StoreNameSupplier` > > and > > > >> just connect to those stores. > > > >> > > > >> Another possibility to avoid the issue of adding the same stores > > > >> multiple times would be, that the DSL always calls `addStateStore()` > > but > > > >> catches a potential "store exists already" exception and falls back > to > > > >> `connectProcessorAndStateStore()` for this case. Thus, we would not > > need > > > >> the `StoreNameSupplier` interface and the order in which > transformers > > > >> are added would not matter either. The only disadvantage I see, > might > > be > > > >> potential bugs about sharing state if two different stores are named > > the > > > >> same by mistake (this would not be detected). > > > >> > > > >> > > > >> > > > >> Just some ideas I wanted to share. What do you think? > > > >> > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 12/11/18 3:46 AM, Paul Whalen wrote: > > > >>> Ah yes of course, this was an oversight, I completely ignored the > > > >> multiple > > > >>> processors sharing the same state store when writing up the KIP. > > Which > > > >> is > > > >>> funny, because I've actually done this (different processors > sharing > > > >> state > > > >>> stores) a fair amount myself, and I've settled on a pattern where I > > > group > > > >>> the Processors in an enclosing class, and that enclosing class > > handles > > > as > > > >>> much as possible. Here's a gist showing the rough structure, just > > for > > > >>> context: > > > >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > > > >>> . Note how it adds the stores to the topology, as well as > providing a > > > >>> public method with the store names. > > > >>> > > > >>> I don't think my proposal completely conflicts with the multiple > > > >> processors > > > >>> sharing state stores use case, since you can create a supplier that > > > >>> provides the store name you want, somewhat independently of your > > actual > > > >>> Processor logic. The issue I do see though, is that > > > >>> topology.addStateStore() can only be called once for a given store. > > So > > > >> for > > > >>> your example, if the there was a single TransformerSupplier that > was > > > >> passed > > > >>> into both transform() calls, "store1" would be added (under the > hood) > > > to > > > >>> the topology twice, which is no good. > > > >>> > > > >>> Perhaps this suggests that one of my alternatives on the KIP might > be > > > >>> desirable: either not having the suppliers return StoreBuilders > (just > > > >> store > > > >>> names), or not deprecating the old methods that take "String... > > > >>> stateStoreNames". I'll have to think about it a bit. > > > >>> > > > >>> Paul > > > >>> > > > >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <wangg...@gmail.com> > > > >> wrote: > > > >>> > > > >>>> Hello Paul, > > > >>>> > > > >>>> Thanks for the great writeup (very detailed and crystal motivation > > > >>>> sections!). > > > >>>> > > > >>>> This is quite an interesting idea and I do like the API cleanness > > you > > > >>>> proposed. The original motivation of letting StreamsTopology to > add > > > >> state > > > >>>> stores though, is to allow different processors to share the state > > > >> store. > > > >>>> For example: > > > >>>> > > > >>>> builder.addStore("store1"); > > > >>>> > > > >>>> // a path of stream transformations that leads to KStream stream1. > > > >>>> stream1.transform(..., "store1"); > > > >>>> > > > >>>> // another path that generates a KStream stream2. > > > >>>> stream2.transform(..., "store1"); > > > >>>> > > > >>>> Behind the scene, Streams will make sure stream1 / stream2 > > > >> transformations > > > >>>> will always be grouped together as a single group of tasks, each > of > > > >> which > > > >>>> will be executed by a single thread and hence there's no > concurrency > > > >> issues > > > >>>> on accessing the store from different operators within the same > > task. > > > >> I'm > > > >>>> not sure how common this use case is, but I'd like to hear if you > > have > > > >> any > > > >>>> thoughts maintaining this since the current proposal seems exclude > > > this > > > >>>> possibility. > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> > > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> > > > wrote: > > > >>>> > > > >>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change > > that > > > I > > > >>>>> think could greatly increase the usability of the low-level > > processor > > > >>>> API. > > > >>>>> I have some code written but will wait to see if there is buy in > > > before > > > >>>>> going all out and creating a pull request. It seems like most of > > the > > > >>>> work > > > >>>>> would be in updating documentation and tests. > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > > > >>>>> > > > >>>>> Thanks! > > > >>>>> Paul > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -- Guozhang > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > > > > -- > -- Guozhang >