Just a meta comment: do we really need to deprecate existing `transform()` etc methods?
The last argument is a vararg, and thus, just keeping the existing API for this part seems to work too, allowing to implement both patterns? Also, instead of adding a default method, we could also add a new interface `StoreBuilderSupplier` with method `List<StoreBuilder> stateStores()` -- users could implement `TransformerSupplier` and `StoreBuilderSupplier` at once; and for this case, we require that users don't provide store name in `transform()`. Similar, we could add an interface `StoreNameSupplier` with method `List<String> stateStores()`. This allows to "auto-wire" a transformer to existing stores (to avoid the issue to add the same store multiple times). Hence, for shared stores, there would be one "main" transformer that implements `StoreBuilderSupplier` and that must be added first to the topology. The other transformers would implement `StoreNameSupplier` and just connect to those stores. Another possibility to avoid the issue of adding the same stores multiple times would be, that the DSL always calls `addStateStore()` but catches a potential "store exists already" exception and falls back to `connectProcessorAndStateStore()` for this case. Thus, we would not need the `StoreNameSupplier` interface and the order in which transformers are added would not matter either. The only disadvantage I see, might be potential bugs about sharing state if two different stores are named the same by mistake (this would not be detected). Just some ideas I wanted to share. What do you think? -Matthias On 12/11/18 3:46 AM, Paul Whalen wrote: > Ah yes of course, this was an oversight, I completely ignored the multiple > processors sharing the same state store when writing up the KIP. Which is > funny, because I've actually done this (different processors sharing state > stores) a fair amount myself, and I've settled on a pattern where I group > the Processors in an enclosing class, and that enclosing class handles as > much as possible. Here's a gist showing the rough structure, just for > context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > . Note how it adds the stores to the topology, as well as providing a > public method with the store names. > > I don't think my proposal completely conflicts with the multiple processors > sharing state stores use case, since you can create a supplier that > provides the store name you want, somewhat independently of your actual > Processor logic. The issue I do see though, is that > topology.addStateStore() can only be called once for a given store. So for > your example, if the there was a single TransformerSupplier that was passed > into both transform() calls, "store1" would be added (under the hood) to > the topology twice, which is no good. > > Perhaps this suggests that one of my alternatives on the KIP might be > desirable: either not having the suppliers return StoreBuilders (just store > names), or not deprecating the old methods that take "String... > stateStoreNames". I'll have to think about it a bit. > > Paul > > On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Paul, >> >> Thanks for the great writeup (very detailed and crystal motivation >> sections!). >> >> This is quite an interesting idea and I do like the API cleanness you >> proposed. The original motivation of letting StreamsTopology to add state >> stores though, is to allow different processors to share the state store. >> For example: >> >> builder.addStore("store1"); >> >> // a path of stream transformations that leads to KStream stream1. >> stream1.transform(..., "store1"); >> >> // another path that generates a KStream stream2. >> stream2.transform(..., "store1"); >> >> Behind the scene, Streams will make sure stream1 / stream2 transformations >> will always be grouped together as a single group of tasks, each of which >> will be executed by a single thread and hence there's no concurrency issues >> on accessing the store from different operators within the same task. I'm >> not sure how common this use case is, but I'd like to hear if you have any >> thoughts maintaining this since the current proposal seems exclude this >> possibility. >> >> >> Guozhang >> >> >> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> wrote: >> >>> Here's KIP-401 for discussion, a minor Kafka Streams API change that I >>> think could greatly increase the usability of the low-level processor >> API. >>> I have some code written but will wait to see if there is buy in before >>> going all out and creating a pull request. It seems like most of the >> work >>> would be in updating documentation and tests. >>> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >>> >>> Thanks! >>> Paul >>> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature