Hi, @Paul: Thank you for the KIP!
I hope you do not mind that I jump in. I have the following comments: 1) `null` vs empty list in the default implementation IIUC, returning `null` in the default implementation should basically signal that the method `stateStores` was not overridden. Why then provide a default implementation in the first place? Without default implementation you would discover the missing implementation already at compile-time and not only at runtime. If you decide not to provide a default implementation, `XSupplier extends StateStoreConnector` would break existing code as Matthias has already pointed out. 2) `process` method adding the StoreBuilders to the topology If the default implementation returned `null` and `XSupplier extends StateStoreConnector`, then existing code would break, because `StreamsBuilder#addStateStore()` would throw a NPE. +1 for opening a WIP PR Best, Bruno On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thank Paul! > > I agree with all of that. If we think that the general design is good, > refactoring a PR if we want to pick a different name should not be too > much additional work (hopefully). Thus, if you want to open a WIP PR and > we use it to nail the open details, it might help to find a good > conclusion. > > >> 2) Default method vs new interface: > > This seems to be the hardest tradeoff. I see the point about > discoveability... Might be good to get input from others, which version > they would prefer. > > Just to make clear, my suggestion from the last email was, that > `Transformer` etc does not extend the new interface. Instead, a user > that want to use this feature would need to implement both interfaces. > > If `Transformer extends StoreProvider` (just picking a name here) > without default implementation existing code would break and thus it not > a an option because of breaking backward compatibility. > > > -Matthias > > On 4/28/19 8:37 PM, Paul Whalen wrote: > > Great thoughts Matthias, thanks! I think we're all agreed that naming and > > documentation/education are the biggest hurdles for this KIP, and in > light > > of that, I think it makes sense for me to just take a stab at a full > > fledged PR with documentation to convince us that it's possible to do it > > with enough clarity. > > > > In response to your specific thoughts: > > > > 1) StateStoreConnector as a name: Really good point about defining the > > difference between "adding" and "connecting." Guozhang suggested > > StateStoreConnector which was definitely an improvement over my > > StateStoresSupplier, but I think you're right that we need to be careful > to > > make it clear that it's really accomplishing both. Thinking about it > now, > > one problem with Connector is that the implementer of the interface is > not > > really doing any connecting, it's providing/supplying the store that will > > be both added and connected. StoreProvider seems reasonable to me and > > probably the best candidate at the moment, but it would be nice if the > name > > could convey that it's providing the store specifically so the caller can > > add it to the topology and connect it to the associated transformer. > > > > In general I think that really calling out what "adding" versus > > "connecting" is in the documentation will help make the entire purpose of > > this feature more clear to the user. > > > > 2) Default method vs new interface: The choice of a default method was > > influenced by Guozhang's fear about API bloat/discoverability. I can > > definitely see it both ways Would the separate interface be a > > sub-interface of Processor/TransformerSupplier or standalone? It seems > > like you're suggesting standalone and I think that's what I favor. My > only > > concern there is that the interface wouldn't actually be a type to any > > public API which sort of hurts discoverability. You would have to read > the > > javadocs for stream.process/transform() to discover that implementing the > > interface in addition to Processor/TransformerSupplier would add and > > connect the store for you. But that added burden actually probably helps > > us in terms of making sure people don't mix and match, like you said. > > > > 3) Returning null instead of empty: Seems fair to me. I always worry > about > > returning null when an empty collection can be used instead, but given > that > > the library is the caller rather than the client I think your point makes > > sense here. > > > > 4) Returning Set instead of Collection: Agreed, don't see why not to make > > it more specific. > > > > Paul > > > > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Hi, sorry for the long pause. Just trying to catch up here. > >> > >> I think it save to allow `addStateStore()` to be idempotent for the same > >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's > >> not really possible to use the same `StoreBuilder` object to create > >> different stores. > >> > >> I also agree with the concern, that only allowing a single store (as > >> proposed by Ivan) might be too restrictive. > >> > >> Overall, the current KIP version LGTM. I don't have mayor concerns about > >> user education for this case, but I agree that we need to document this > >> clearly. > >> > >> Some further comments: > >> > >> (1) I am not sure if `StateStoreConnector` is the best name for the new > >> interface. Note, that there are two concepts about stores: > >> > >> - adding a store: this makes the store available in the topology in > >> general (however, the store is still "dangling", and not used) > >> - connecting a store: this allows a processor etc to use a store > >> > >> The new interface does both, but its name only indicates that second > >> part what might be confusing. It might be especially confusing because > >> we want to disallow to mix the exiting "manually add and connect" > >> pattern, with a new pattern to "auto add+connect". If the new interface > >> name indicates the connect part only, user might think they need to add > >> stores manually and can connect automatically. > >> > >> Unfortunately, I don't have a much better suggestion for a name either. > >> The only idea that came to my mind was `StoreProvider`: to me, a > >> provider is a "service" interface that does work for us, ie, it adds and > >> connects a store. Not sure if this is too subtle, if we consider that > >> there is already the `StoreSupplier` interface? > >> > >> But maybe somebody else might still have a good idea on how the improve > >> the name. > >> > >> In any case, I would suggest to shorten the name to `StoreConnector` > >> instead of `StateStoreConnector`, because we also have `StoreSupplier` > >> and `StoreBuilder`. > >> > >> > >> > >> (2) The KIP proposes to add the new interface to `ProcessorSupplier` etc > >> and to add a default implementation for the new method. Hence, user > >> would need to overwrite this default implementation to op-in to the > >> feature. I am wonder if it might be better to not add the new interface > >> to `ProcessorSupplier` etc and to just provide a new interface with no > >> default implementation. Users would opt-in by adding the interface > >> explicitly to their existing `ProcessorSupplier` implementation. > >> Overwriting a default method and getting different behavior seems to be > >> a little subtle to me, especially, because we don't want to allow to > >> mix-and-match the old and new approaches. Think: I only overwrite a > >> default method and my code breaks. > >> > >> Thoughts? > >> > >> > >> > >> (3) If we keep the current default implementation for the new method, I > >> am wondering if it should return `null` instead of an empty collection? > >> This might be saver to detect bugs in user code for which, per accident, > >> an empty collection could be returned. > >> > >> > >> > >> (4) Should the new method return a `Set` instead of a `Collection` to > >> indicate the semantics clearly (ie, returning the same `StoreBuilder` > >> multiple times is idempotent and one cannot add+connect to it twice). > >> > >> > >> > >> -Matthias > >> > >> > >> > >> > >> On 4/6/19 12:27 PM, Paul Whalen wrote: > >>> Ivan and Guozhang, > >>> > >>> Thanks for the thoughts! Ivan's use case is definitely interesting. > The > >>> way I see it, if we can achieve the main goal of the KIP (allowing > >>> Processor/TransformerSuppliers to encapsulate their usage of state > >> stores), > >>> we will enable this kind of thing in "user space" very easily. > >>> > >>> I will say that I'm not totally sure that most use cases of transform() > >> use > >>> just one state store. It's hard to know since I haven't seen many > >> examples > >>> in public, but my team's usages almost exclusively require multiple > state > >>> stores. We only reach for the low level processor API when we need > that > >>> complexity, and it's somewhat hard to imagine many use cases that only > >> need > >>> one state store, since the high level DSL can usually accomplish those > >>> tasks. The example Ivan presented for instance looks like a > >>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious what sort > of > >>> other usages you're imagining. > >>> > >>> That being said, perhaps the Processor API should really just be > >> considered > >>> a separate paradigm in Streams, not just a lower level that we reach to > >>> when necessary. In which case it would be beneficial to make the > simple > >>> use cases easier. I've definitely talked about this with my own team - > >> if > >>> you're less familiar with the kind of functional style that the high > >> level > >>> DSL offers, it might be easier to "see" your state and interact with it > >>> directly. > >>> > >>> Anyway, I've updated the KIP to reflect my current PR with Guozhang's > >>> suggestions. It seems like there is at least some interest in that on > >> its > >>> own and not a ton of pushback, so I think I will try to start a vote. > >>> > >>> Paul > >>> > >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <iponoma...@mail.ru> > >> wrote: > >>> > >>>> Hi all! > >>>> > >>>> I was about to write another KIP, but found out that KIP-401 addresses > >>>> exactly the problem I faced. So let me jump into your discussion and > ask > >>>> you to assess another idea. > >>>> > >>>> I fully agree with the KIP-401's motivation part. E. g in my project I > >> had > >>>> to invent a wrapper class that hides the details of KeyValueStore > >>>> management from business logic. Of course this should be done better > in > >>>> KStreams API. > >>>> > >>>> But I was about to look at this problem from another side and propose > a > >>>> simple alternative in high-level DSL, that will not fit all the cases, > >> but > >>>> most of them. Hence my idea does not exclude the Paul's proposal. > >>>> > >>>> What if we restrict ourselves to *only one* KeyValueStore and propose > a > >>>> method that resembles `aggregate` and `reduce` methods, like this: > >>>> > >>>> stream > >>>> .map(...) > >>>> .filter(...) > >>>> .transform ((k, v, s)->{....}, Transformed.with(....)) > >>>> > >>>> where > >>>> * k, v -- input key & value > >>>> * s -- a KeyValueStore provided as an argument > >>>> * return value of the lambda should be KeyValue.pair(...) > >>>> * Transformed.with... is a builder, used in order to define the > >>>> Transformer and KeyValueStore building parameters. Some of these > >> parameters > >>>> should be: > >>>> ** store's KeySerde, > >>>> ** store's ValueSerde, > >>>> ** whether the store is persistent or in-memory, > >>>> ** store's name -- optional parameter, the system should be able to > >> devise > >>>> the name of the store transparently for the user, if we don't want to > >>>> devise it ourselves/share the store between processors. > >>>> ** scheduled punctuation. > >>>> > >>>> Imagine we have a KStream<String, Integer>, and we need to calculate a > >>>> `derivative` stream, that is, a stream of 'deltas' of the provided > >> integer > >>>> values. > >>>> > >>>> This could be achieved as simple as > >>>> > >>>> stream.transform((key, value, stateStore) -> { > >>>> int previousValue = > >>>> Optional.ofNullable(stateStore.get(key)).orElse(0); > >>>> stateStore.put(key, value); > >>>> return KeyValue.pair(key, value - previousValue); > >>>> } > >>>> //we do not need to bother with store name, punctuation etc. > >>>> //may be even Serde part can be omitted, since we can inherit > >> the > >>>> serdes from stream by default > >>>> , Transformed.with(Serdes.String(), Serdes.Integer()) > >>>> } > >>>> > >>>> The hard part of it is that new `transform` method definition should > be > >>>> parameterized by six type parameters: > >>>> > >>>> * input/output/KeyValueStore key type, > >>>> * input/output/KeyValueStore value type. > >>>> > >>>> However, it seems that all these types can be inferred from the > provided > >>>> lambda and Transformed.with instances. > >>>> > >>>> What do you think about this? > >>>> > >>>> Regards, > >>>> > >>>> Ivan > >>>> > >>>> > >>>> 27.03.2019 20:45, Guozhang Wang пишет: > >>>> > >>>> Hello Paul, > >>>> > >>>> Thanks for the uploaded PR and the detailed description! I've made a > >> pass > >>>> on it and left some comments. > >>>> > >>>> Overall I think I agree with you that passing in the storebuilder > >> directly > >>>> that store name is more convienent as it does not require another > >>>> `addStore` call, but we just need to spend some more documentation > >> effort > >>>> on educating users about the two ways of connecting their stores. I'm > >>>> slightly concerned about this education curve but I can be convinced > if > >>>> most people felt it is worthy. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> < > >> pgwha...@gmail.com> wrote: > >>>> > >>>> > >>>> I'd like to resurrect this discussion with a cursory, proof-of-concept > >>>> implementation of the KIP which combines many of our ideas: > >> https://github.com/apache/kafka/pull/6496. I tried to keep the diff as > >>>> small as possible for now, just using it to convey the main ideas. > But > >>>> I'll separately address some of our earlier discussion: > >>>> > >>>> - Will there be a new, separate interface for users to implement > for > >> the > >>>> new functionality? No, to hopefully keep things simple, all of the > >>>> Processor/TransformerSupplier interfaces will just extend > >>>> StateStoresSupplier, allowing users to opt in to this functionality > >> by > >>>> overriding the default implementation that gives an empty list. > >>>> - Will the interface allow users to specify the store name, or the > >>>> entire StoreBuilder? The entire StoreBuilder, so the > >>>> Processor/TransformerSupplier can completely encapsulate name and > >>>> implementation of a state store if desired. > >>>> - Will the old way of specifying store names alongside the supplier > >> when > >>>> calling stream.process/transform() be deprecated? No, this is > still a > >>>> legitimate way to wire up Processors/Transformers and their stores. > >> But > >>>> I > >>>> would recommend not allowing stream.process/transform() calls that > >> use > >>>> both > >>>> store declaration mechanisms (this restriction is not in the proof > of > >>>> concept) > >>>> - How will we handle adding the same state store to the topology > >>>> multiple times because different Processor/TransformerSuppliers > >> declare > >>>> it? > >>>> topology.addStateStore() will be slightly relaxed for convenience, > >> and > >>>> will > >>>> allow adding the same StoreBuilder multiple times as long as the > >> exact > >>>> same > >>>> StoreBuilder instance is being added for the same store name. This > >>>> seems > >>>> to prevent in practice the issue of accidentally making two state > >> stores > >>>> one by adding with the same name. For additional safety, if we > >> wanted > >>>> to > >>>> (not in the proof of concept), we could allow for this relaxation > >> only > >>>> for > >>>> internal callers of topology.addStateStore(). > >>>> > >>>> So, in summary, the use cases look like: > >>>> > >>>> - 1 transformer/processor that owns its store: Using the new > >>>> StateStoresSupplier interface method to supply its StoreBuilders > that > >>>> will > >>>> be added to the topology automatically. > >>>> - Multiple transformer/processors that share the same store: Either > >>>> > >>>> > >>>> 1. The old way: the StoreBuilder is defined "far away" from the > >>>> Transformer/Processor implementations, and is added to the topology > >>>> manually by the user > >>>> 2. The new way: the StoreBuilder is defined closer to the > >>>> Transformer/Processor implementations, and the same instance is > >>>> returned by > >>>> all Transformer/ProcessorSuppliers that need it > >>>> > >>>> > >>>> This makes the KIP wiki a bit stale; I'll update if we want to bring > >> this > >>>> design to a vote. > >>>> > >>>> Thanks! > >>>> Paul > >>>> > >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> < > >> wangg...@gmail.com> wrote: > >>>> > >>>> > >>>> Matthias / Paul, > >>>> > >>>> The concern I had about introducing `StoreBuilderSupplier` is simply > >>>> because it is another XXSupplier to the public API, so I'd like to ask > >> if > >>>> we really have to add it :) > >>>> > >>>> The difference between encapsulating the store name and encapsulating > >> the > >>>> full state store builder is that, in the former: > >>>> > >>>> ----------- > >>>> > >>>> String storeName = "store1"; > >>>> builder.addStore(new MyStoreBuilder(storeName)); > >>>> stream1.transform(new MyTransformerSupplier(storeName)); // > following > >>>> > >>>> my > >>>> > >>>> proposal, that the store name can be passed in and used for both > >>>> `listStores` and in the `Transformer#init`; so the Transformer > function > >>>> does not need to get the constant string name again. > >>>> > >>>> // one caveat to admit, is that > >>>> MyTransofmerSupplier logic may be just unique to `store1` so it cannot > >> be > >>>> reused with a different store name anyways. > >>>> ----------- > >>>> > >>>> While in the latter: > >>>> > >>>> ----------- > >>>> > >>>> stream1.transform(new MyTransformerSupplierForStore1); // the name > is > >>>> just indicating that we may have one such supplier for each store. > >>>> > >>>> ----------- > >>>> > >>>> I understand the latter introduce more convenience from the API, but > the > >>>> cost is that since we still cannot completely `builder.addStore`, but > >>>> > >>>> only > >>>> > >>>> reduce its semantic scope to shared state stores only,; hence users > need > >>>> > >>>> to > >>>> > >>>> learn two ways of creating state stores for those two patterns. > >>>> > >>>> My argument is that more public APIs requires longer learning curve > for > >>>> users, and introduces more usage patterns that may confuse users (the > >>>> proposal I had tries to replace one with another completely). > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> < > >> pgwha...@gmail.com> wrote: > >>>> > >>>> > >>>> Thanks for the great thoughts Matthias and Guozhang! > >>>> > >>>> If I'm not mistaken, Guozhang's suggestion is what my second > >>>> > >>>> alternative > >>>> > >>>> on > >>>> > >>>> the KIP is ("Have the added method on the Supplier interfaces only > >>>> > >>>> return > >>>> > >>>> store names, not builders"). I do think it would be a worthwhile > >>>> > >>>> usability > >>>> > >>>> improvement on its own, but to Matthias's point, it doesn't achieve > the > >>>> full goal of completing encapsulating a state store and it's processor > >>>> > >>>> - > >>>> > >>>> it > >>>> > >>>> encapsulates the name, but not the StateStoreBuilder. > >>>> > >>>> I'm really intrigued by Matthias's idea that forgoes the default > >>>> > >>>> interface > >>>> > >>>> method I proposed. Having smaller, separate interfaces is a powerful > >>>> > >>>> idea > >>>> > >>>> and I think a cleaner API than what I proposed. The non-shared store > >>>> > >>>> use > >>>> > >>>> case is handled well here, and the shared store use case is possible, > >>>> though maybe still not as graceful as we would like (having to add the > >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too > >>>> > >>>> subtle > >>>> > >>>> to > >>>> > >>>> me). > >>>> > >>>> We're all agreed that one of the big problems with the shared store > use > >>>> case is how to deal with adding the same store to the topology > multiple > >>>> times. Catching the "store already added" exception is risky. Here's > >>>> > >>>> a > >>>> > >>>> maybe radical idea: change `topology.addStateStore()` to be idempotent > >>>> > >>>> for > >>>> > >>>> adding a given state store name and `StoreBuilder`. In other words, > >>>> `addStateStore` would not throw the "store already added" exception if > >>>> > >>>> the > >>>> > >>>> `StoreBuilder` being added for a given name has the same identity as > >>>> > >>>> the > >>>> > >>>> one that has already been added. Does this eliminate all the bugs > >>>> > >>>> we're > >>>> > >>>> worried about? Thinking about it for a few minutes, it seems to > >>>> > >>>> eliminate > >>>> > >>>> most at least (would a user really use the exact same StoreBuilder > when > >>>> they intend there to be two stores?). It might make the API slightly > >>>> harder to use if a user isn't immediately aware of that subtlety, but > a > >>>> good error message should ease the pain, and it would happen > >>>> > >>>> immediately > >>>> > >>>> during development. > >>>> > >>>> And with regards to Matthias's comment about whether we need to > >>>> > >>>> deprecate > >>>> > >>>> existing varargs transform methods - I don't think we need to, but it > >>>> > >>>> might > >>>> > >>>> be nice for there only to be one way to do things, assuming whatever > we > >>>> come up with supports all existing use cases. I don't feel strongly > >>>> > >>>> about > >>>> > >>>> this, but if we don't deprecate, I do think it's important to add > >>>> > >>>> checks > >>>> > >>>> that prevent users from trying to do the same thing in two different > >>>> > >>>> ways, > >>>> > >>>> as we've discussed. > >>>> > >>>> Paul > >>>> > >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < > matth...@confluent.io > >>>> > >>>> wrote: > >>>> > >>>> > >>>> Guozhang, > >>>> > >>>> > >>>> Regarding the last option to catch "store exist already" exception > >>>> > >>>> and > >>>> > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > >>>> > >>>> actual > >>>> > >>>> user bugs. > >>>> > >>>> I agree with this concern. From my original email: > >>>> > >>>> > >>>> The only disadvantage I see, might be > >>>> potential bugs about sharing state if two different stores are > >>>> > >>>> named > >>>> > >>>> the > >>>> > >>>> same by mistake (this would not be detected). > >>>> > >>>> For your new proposal: I am not sure if it addresses Paul's original > >>>> idea -- I hope Paul can clarify. From my understanding, the idea was > >>>> > >>>> to > >>>> > >>>> encapsulate a store and its processor. As many stores are not shared, > >>>> this seems to be quite useful. Your proposal falls a little short to > >>>> support encapsulation for none-shared stores. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: > >>>> > >>>> Matthias, > >>>> > >>>> Thanks for your feedbacks. > >>>> > >>>> Regarding the last option to catch "store exist already" exception > >>>> > >>>> and > >>>> > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > >>>> > >>>> actual > >>>> > >>>> user bugs. > >>>> > >>>> Thinking about Paul's proposal and your suggestion again, I'd like > >>>> > >>>> to > >>>> > >>>> propose another alternative somewhere in the middle of your > >>>> > >>>> approaches, > >>>> > >>>> i.e. we still let users to create sharable state stores via > >>>> `addStateStore`, and we allow the TransformerSupplier to return a > >>>> > >>>> list > >>>> > >>>> of > >>>> > >>>> state stores that it needs, i.e.: > >>>> > >>>> public interface TransformerSupplier<K, V, R> { > >>>> Transformer<K, V, R> get(); > >>>> default List<String> stateStoreNames() { > >>>> return Collections.emptyList(); > >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > >>>> > >>>> ;> > >>>> > >>>> } > >>>> } > >>>> > >>>> by doing this users can still "consolidate" the references of store > >>>> > >>>> names > >>>> > >>>> in a single place in the transform call, e.g.: > >>>> > >>>> public class MyTransformerSupplier<K, V, R> { > >>>> private String storeName; > >>>> > >>>> public class MyTransformer<K, V, R> { > >>>> > >>>> .... > >>>> > >>>> init() { > >>>> store = context.getStateStore(storeName); > >>>> } > >>>> } > >>>> > >>>> default List<String> stateStoreNames() { > >>>> return Collections.singletonList(storeName); > >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > >>>> > >>>> ;> > >>>> > >>>> } > >>>> } > >>>> > >>>> Basically, we move the parameters from the caller of `transform` to > >>>> > >>>> inside > >>>> > >>>> the TransformSuppliers. DSL implementations would not change much, > >>>> > >>>> simply > >>>> > >>>> calling `connectStateStore` by getting the list of names from the > >>>> > >>>> provided > >>>> > >>>> function. > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < > >>>> > >>>> matth...@confluent.io > >>>> > >>>> wrote: > >>>> > >>>> > >>>> Just a meta comment: do we really need to deprecate existing > >>>> `transform()` etc methods? > >>>> > >>>> The last argument is a vararg, and thus, just keeping the existing > >>>> > >>>> API > >>>> > >>>> for this part seems to work too, allowing to implement both > >>>> > >>>> patterns? > >>>> > >>>> Also, instead of adding a default method, we could also add a new > >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > >>>> stateStores()` -- users could implement `TransformerSupplier` and > >>>> `StoreBuilderSupplier` at once; and for this case, we require that > >>>> > >>>> users > >>>> > >>>> don't provide store name in `transform()`. > >>>> > >>>> Similar, we could add an interface `StoreNameSupplier` with method > >>>> `List<String> stateStores()`. This allows to "auto-wire" a > >>>> > >>>> transformer > >>>> > >>>> to existing stores (to avoid the issue to add the same store > >>>> > >>>> multiple > >>>> > >>>> times). > >>>> > >>>> Hence, for shared stores, there would be one "main" transformer > >>>> > >>>> that > >>>> > >>>> implements `StoreBuilderSupplier` and that must be added first to > >>>> > >>>> the > >>>> > >>>> topology. The other transformers would implement > >>>> > >>>> `StoreNameSupplier` > >>>> > >>>> and > >>>> > >>>> just connect to those stores. > >>>> > >>>> Another possibility to avoid the issue of adding the same stores > >>>> multiple times would be, that the DSL always calls > >>>> > >>>> `addStateStore()` > >>>> > >>>> but > >>>> > >>>> catches a potential "store exists already" exception and falls > >>>> > >>>> back > >>>> > >>>> to > >>>> > >>>> `connectProcessorAndStateStore()` for this case. Thus, we would > >>>> > >>>> not > >>>> > >>>> need > >>>> > >>>> the `StoreNameSupplier` interface and the order in which > >>>> > >>>> transformers > >>>> > >>>> are added would not matter either. The only disadvantage I see, > >>>> > >>>> might > >>>> > >>>> be > >>>> > >>>> potential bugs about sharing state if two different stores are > >>>> > >>>> named > >>>> > >>>> the > >>>> > >>>> same by mistake (this would not be detected). > >>>> > >>>> > >>>> > >>>> Just some ideas I wanted to share. What do you think? > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 12/11/18 3:46 AM, Paul Whalen wrote: > >>>> > >>>> Ah yes of course, this was an oversight, I completely ignored the > >>>> > >>>> multiple > >>>> > >>>> processors sharing the same state store when writing up the KIP. > >>>> > >>>> Which > >>>> > >>>> is > >>>> > >>>> funny, because I've actually done this (different processors > >>>> > >>>> sharing > >>>> > >>>> state > >>>> > >>>> stores) a fair amount myself, and I've settled on a pattern > >>>> > >>>> where I > >>>> > >>>> group > >>>> > >>>> the Processors in an enclosing class, and that enclosing class > >>>> > >>>> handles > >>>> > >>>> as > >>>> > >>>> much as possible. Here's a gist showing the rough structure, > >>>> > >>>> just > >>>> > >>>> for > >>>> > >>>> context: > >>>> > >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > >>>> > >>>> . Note how it adds the stores to the topology, as well as > >>>> > >>>> providing a > >>>> > >>>> public method with the store names. > >>>> > >>>> I don't think my proposal completely conflicts with the multiple > >>>> > >>>> processors > >>>> > >>>> sharing state stores use case, since you can create a supplier > >>>> > >>>> that > >>>> > >>>> provides the store name you want, somewhat independently of your > >>>> > >>>> actual > >>>> > >>>> Processor logic. The issue I do see though, is that > >>>> topology.addStateStore() can only be called once for a given > >>>> > >>>> store. > >>>> > >>>> So > >>>> > >>>> for > >>>> > >>>> your example, if the there was a single TransformerSupplier that > >>>> > >>>> was > >>>> > >>>> passed > >>>> > >>>> into both transform() calls, "store1" would be added (under the > >>>> > >>>> hood) > >>>> > >>>> to > >>>> > >>>> the topology twice, which is no good. > >>>> > >>>> Perhaps this suggests that one of my alternatives on the KIP > >>>> > >>>> might > >>>> > >>>> be > >>>> > >>>> desirable: either not having the suppliers return StoreBuilders > >>>> > >>>> (just > >>>> > >>>> store > >>>> > >>>> names), or not deprecating the old methods that take "String... > >>>> stateStoreNames". I'll have to think about it a bit. > >>>> > >>>> Paul > >>>> > >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < > >>>> > >>>> wangg...@gmail.com> > >>>> > >>>> wrote: > >>>> > >>>> Hello Paul, > >>>> > >>>> Thanks for the great writeup (very detailed and crystal > >>>> > >>>> motivation > >>>> > >>>> sections!). > >>>> > >>>> This is quite an interesting idea and I do like the API > >>>> > >>>> cleanness > >>>> > >>>> you > >>>> > >>>> proposed. The original motivation of letting StreamsTopology to > >>>> > >>>> add > >>>> > >>>> state > >>>> > >>>> stores though, is to allow different processors to share the > >>>> > >>>> state > >>>> > >>>> store. > >>>> > >>>> For example: > >>>> > >>>> builder.addStore("store1"); > >>>> > >>>> // a path of stream transformations that leads to KStream > >>>> > >>>> stream1. > >>>> > >>>> stream1.transform(..., "store1"); > >>>> > >>>> // another path that generates a KStream stream2. > >>>> stream2.transform(..., "store1"); > >>>> > >>>> Behind the scene, Streams will make sure stream1 / stream2 > >>>> > >>>> transformations > >>>> > >>>> will always be grouped together as a single group of tasks, each > >>>> > >>>> of > >>>> > >>>> which > >>>> > >>>> will be executed by a single thread and hence there's no > >>>> > >>>> concurrency > >>>> > >>>> issues > >>>> > >>>> on accessing the store from different operators within the same > >>>> > >>>> task. > >>>> > >>>> I'm > >>>> > >>>> not sure how common this use case is, but I'd like to hear if > >>>> > >>>> you > >>>> > >>>> have > >>>> > >>>> any > >>>> > >>>> thoughts maintaining this since the current proposal seems > >>>> > >>>> exclude > >>>> > >>>> this > >>>> > >>>> possibility. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> < > >> pgwha...@gmail.com> > >>>> > >>>> wrote: > >>>> > >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change > >>>> > >>>> that > >>>> > >>>> I > >>>> > >>>> think could greatly increase the usability of the low-level > >>>> > >>>> processor > >>>> > >>>> API. > >>>> > >>>> I have some code written but will wait to see if there is buy > >>>> > >>>> in > >>>> > >>>> before > >>>> > >>>> going all out and creating a pull request. It seems like most > >>>> > >>>> of > >>>> > >>>> the > >>>> > >>>> work > >>>> > >>>> would be in updating documentation and tests. > >>>> > >>>> > >>>> > >>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > >>>> > >>>> Thanks! > >>>> Paul > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>>> > >>>> > >>>> > >>> > >> > >> > > > >