I definitely don't mind anyone jumping, Bruno, thanks for the comments! 1) I'm not totally sure I'm clear on your point, but I think we're on the same page - if we're adding a method to the XSupplier interfaces (by making them inherit from a super interface StateStoreConnector) then we definitely need a default implementation to maintain compatibility. Whether the default implementation returns null or an empty list is somewhat of a detail.
2) If stream.process() sees that StateStoreConnector#stateStores() returns either null or an empty list, it would handle that case specifically and not try to call addStateStore at all. Or is this not what you're asking? Separately, I'm still hacking away at the details of the PR and will continue to get something into a discussable state, but I'll share some thoughts I've run into. A) I'm tentatively going the separate interface route (Matthias's suggestion) and naming it ConnectedStoreProvider. Still don't love the name, but there's something nice about the name indicating *why* this thing is providing the store, not just that it is providing it. B) It has occurred to me that topology.addProcessor() could also recognize if ProcessorSupplier implements ConnectedStoreProvider and add and connect stores appropriately. This isn't in the KIP and I think the value-add is lower (if you're reaching that low level, surely the "auto add/connect store" isn't too important to you), but I think it would be a confusing if it didn't, and I don't see any real downside. Paul On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io> wrote: > Hi, > > @Paul: Thank you for the KIP! > > I hope you do not mind that I jump in. > > I have the following comments: > > 1) `null` vs empty list in the default implementation > IIUC, returning `null` in the default implementation should basically > signal that the method `stateStores` was not overridden. Why then provide a > default implementation in the first place? Without default implementation > you would discover the missing implementation already at compile-time and > not only at runtime. If you decide not to provide a default implementation, > `XSupplier extends StateStoreConnector` would break existing code as > Matthias has already pointed out. > > 2) `process` method adding the StoreBuilders to the topology > If the default implementation returned `null` and `XSupplier extends > StateStoreConnector`, then existing code would break, because > `StreamsBuilder#addStateStore()` would throw a NPE. > > +1 for opening a WIP PR > > Best, > Bruno > > > On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Thank Paul! > > > > I agree with all of that. If we think that the general design is good, > > refactoring a PR if we want to pick a different name should not be too > > much additional work (hopefully). Thus, if you want to open a WIP PR and > > we use it to nail the open details, it might help to find a good > > conclusion. > > > > >> 2) Default method vs new interface: > > > > This seems to be the hardest tradeoff. I see the point about > > discoveability... Might be good to get input from others, which version > > they would prefer. > > > > Just to make clear, my suggestion from the last email was, that > > `Transformer` etc does not extend the new interface. Instead, a user > > that want to use this feature would need to implement both interfaces. > > > > If `Transformer extends StoreProvider` (just picking a name here) > > without default implementation existing code would break and thus it not > > a an option because of breaking backward compatibility. > > > > > > -Matthias > > > > On 4/28/19 8:37 PM, Paul Whalen wrote: > > > Great thoughts Matthias, thanks! I think we're all agreed that naming > and > > > documentation/education are the biggest hurdles for this KIP, and in > > light > > > of that, I think it makes sense for me to just take a stab at a full > > > fledged PR with documentation to convince us that it's possible to do > it > > > with enough clarity. > > > > > > In response to your specific thoughts: > > > > > > 1) StateStoreConnector as a name: Really good point about defining the > > > difference between "adding" and "connecting." Guozhang suggested > > > StateStoreConnector which was definitely an improvement over my > > > StateStoresSupplier, but I think you're right that we need to be > careful > > to > > > make it clear that it's really accomplishing both. Thinking about it > > now, > > > one problem with Connector is that the implementer of the interface is > > not > > > really doing any connecting, it's providing/supplying the store that > will > > > be both added and connected. StoreProvider seems reasonable to me and > > > probably the best candidate at the moment, but it would be nice if the > > name > > > could convey that it's providing the store specifically so the caller > can > > > add it to the topology and connect it to the associated transformer. > > > > > > In general I think that really calling out what "adding" versus > > > "connecting" is in the documentation will help make the entire purpose > of > > > this feature more clear to the user. > > > > > > 2) Default method vs new interface: The choice of a default method was > > > influenced by Guozhang's fear about API bloat/discoverability. I can > > > definitely see it both ways Would the separate interface be a > > > sub-interface of Processor/TransformerSupplier or standalone? It seems > > > like you're suggesting standalone and I think that's what I favor. My > > only > > > concern there is that the interface wouldn't actually be a type to any > > > public API which sort of hurts discoverability. You would have to read > > the > > > javadocs for stream.process/transform() to discover that implementing > the > > > interface in addition to Processor/TransformerSupplier would add and > > > connect the store for you. But that added burden actually probably > helps > > > us in terms of making sure people don't mix and match, like you said. > > > > > > 3) Returning null instead of empty: Seems fair to me. I always worry > > about > > > returning null when an empty collection can be used instead, but given > > that > > > the library is the caller rather than the client I think your point > makes > > > sense here. > > > > > > 4) Returning Set instead of Collection: Agreed, don't see why not to > make > > > it more specific. > > > > > > Paul > > > > > > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > >> Hi, sorry for the long pause. Just trying to catch up here. > > >> > > >> I think it save to allow `addStateStore()` to be idempotent for the > same > > >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus > it's > > >> not really possible to use the same `StoreBuilder` object to create > > >> different stores. > > >> > > >> I also agree with the concern, that only allowing a single store (as > > >> proposed by Ivan) might be too restrictive. > > >> > > >> Overall, the current KIP version LGTM. I don't have mayor concerns > about > > >> user education for this case, but I agree that we need to document > this > > >> clearly. > > >> > > >> Some further comments: > > >> > > >> (1) I am not sure if `StateStoreConnector` is the best name for the > new > > >> interface. Note, that there are two concepts about stores: > > >> > > >> - adding a store: this makes the store available in the topology in > > >> general (however, the store is still "dangling", and not used) > > >> - connecting a store: this allows a processor etc to use a store > > >> > > >> The new interface does both, but its name only indicates that second > > >> part what might be confusing. It might be especially confusing because > > >> we want to disallow to mix the exiting "manually add and connect" > > >> pattern, with a new pattern to "auto add+connect". If the new > interface > > >> name indicates the connect part only, user might think they need to > add > > >> stores manually and can connect automatically. > > >> > > >> Unfortunately, I don't have a much better suggestion for a name > either. > > >> The only idea that came to my mind was `StoreProvider`: to me, a > > >> provider is a "service" interface that does work for us, ie, it adds > and > > >> connects a store. Not sure if this is too subtle, if we consider that > > >> there is already the `StoreSupplier` interface? > > >> > > >> But maybe somebody else might still have a good idea on how the > improve > > >> the name. > > >> > > >> In any case, I would suggest to shorten the name to `StoreConnector` > > >> instead of `StateStoreConnector`, because we also have `StoreSupplier` > > >> and `StoreBuilder`. > > >> > > >> > > >> > > >> (2) The KIP proposes to add the new interface to `ProcessorSupplier` > etc > > >> and to add a default implementation for the new method. Hence, user > > >> would need to overwrite this default implementation to op-in to the > > >> feature. I am wonder if it might be better to not add the new > interface > > >> to `ProcessorSupplier` etc and to just provide a new interface with no > > >> default implementation. Users would opt-in by adding the interface > > >> explicitly to their existing `ProcessorSupplier` implementation. > > >> Overwriting a default method and getting different behavior seems to > be > > >> a little subtle to me, especially, because we don't want to allow to > > >> mix-and-match the old and new approaches. Think: I only overwrite a > > >> default method and my code breaks. > > >> > > >> Thoughts? > > >> > > >> > > >> > > >> (3) If we keep the current default implementation for the new method, > I > > >> am wondering if it should return `null` instead of an empty > collection? > > >> This might be saver to detect bugs in user code for which, per > accident, > > >> an empty collection could be returned. > > >> > > >> > > >> > > >> (4) Should the new method return a `Set` instead of a `Collection` to > > >> indicate the semantics clearly (ie, returning the same `StoreBuilder` > > >> multiple times is idempotent and one cannot add+connect to it twice). > > >> > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> > > >> On 4/6/19 12:27 PM, Paul Whalen wrote: > > >>> Ivan and Guozhang, > > >>> > > >>> Thanks for the thoughts! Ivan's use case is definitely interesting. > > The > > >>> way I see it, if we can achieve the main goal of the KIP (allowing > > >>> Processor/TransformerSuppliers to encapsulate their usage of state > > >> stores), > > >>> we will enable this kind of thing in "user space" very easily. > > >>> > > >>> I will say that I'm not totally sure that most use cases of > transform() > > >> use > > >>> just one state store. It's hard to know since I haven't seen many > > >> examples > > >>> in public, but my team's usages almost exclusively require multiple > > state > > >>> stores. We only reach for the low level processor API when we need > > that > > >>> complexity, and it's somewhat hard to imagine many use cases that > only > > >> need > > >>> one state store, since the high level DSL can usually accomplish > those > > >>> tasks. The example Ivan presented for instance looks like a > > >>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious what > sort > > of > > >>> other usages you're imagining. > > >>> > > >>> That being said, perhaps the Processor API should really just be > > >> considered > > >>> a separate paradigm in Streams, not just a lower level that we reach > to > > >>> when necessary. In which case it would be beneficial to make the > > simple > > >>> use cases easier. I've definitely talked about this with my own > team - > > >> if > > >>> you're less familiar with the kind of functional style that the high > > >> level > > >>> DSL offers, it might be easier to "see" your state and interact with > it > > >>> directly. > > >>> > > >>> Anyway, I've updated the KIP to reflect my current PR with Guozhang's > > >>> suggestions. It seems like there is at least some interest in that > on > > >> its > > >>> own and not a ton of pushback, so I think I will try to start a vote. > > >>> > > >>> Paul > > >>> > > >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <iponoma...@mail.ru> > > >> wrote: > > >>> > > >>>> Hi all! > > >>>> > > >>>> I was about to write another KIP, but found out that KIP-401 > addresses > > >>>> exactly the problem I faced. So let me jump into your discussion and > > ask > > >>>> you to assess another idea. > > >>>> > > >>>> I fully agree with the KIP-401's motivation part. E. g in my > project I > > >> had > > >>>> to invent a wrapper class that hides the details of KeyValueStore > > >>>> management from business logic. Of course this should be done better > > in > > >>>> KStreams API. > > >>>> > > >>>> But I was about to look at this problem from another side and > propose > > a > > >>>> simple alternative in high-level DSL, that will not fit all the > cases, > > >> but > > >>>> most of them. Hence my idea does not exclude the Paul's proposal. > > >>>> > > >>>> What if we restrict ourselves to *only one* KeyValueStore and > propose > > a > > >>>> method that resembles `aggregate` and `reduce` methods, like this: > > >>>> > > >>>> stream > > >>>> .map(...) > > >>>> .filter(...) > > >>>> .transform ((k, v, s)->{....}, Transformed.with(....)) > > >>>> > > >>>> where > > >>>> * k, v -- input key & value > > >>>> * s -- a KeyValueStore provided as an argument > > >>>> * return value of the lambda should be KeyValue.pair(...) > > >>>> * Transformed.with... is a builder, used in order to define the > > >>>> Transformer and KeyValueStore building parameters. Some of these > > >> parameters > > >>>> should be: > > >>>> ** store's KeySerde, > > >>>> ** store's ValueSerde, > > >>>> ** whether the store is persistent or in-memory, > > >>>> ** store's name -- optional parameter, the system should be able to > > >> devise > > >>>> the name of the store transparently for the user, if we don't want > to > > >>>> devise it ourselves/share the store between processors. > > >>>> ** scheduled punctuation. > > >>>> > > >>>> Imagine we have a KStream<String, Integer>, and we need to > calculate a > > >>>> `derivative` stream, that is, a stream of 'deltas' of the provided > > >> integer > > >>>> values. > > >>>> > > >>>> This could be achieved as simple as > > >>>> > > >>>> stream.transform((key, value, stateStore) -> { > > >>>> int previousValue = > > >>>> Optional.ofNullable(stateStore.get(key)).orElse(0); > > >>>> stateStore.put(key, value); > > >>>> return KeyValue.pair(key, value - previousValue); > > >>>> } > > >>>> //we do not need to bother with store name, punctuation etc. > > >>>> //may be even Serde part can be omitted, since we can > inherit > > >> the > > >>>> serdes from stream by default > > >>>> , Transformed.with(Serdes.String(), Serdes.Integer()) > > >>>> } > > >>>> > > >>>> The hard part of it is that new `transform` method definition should > > be > > >>>> parameterized by six type parameters: > > >>>> > > >>>> * input/output/KeyValueStore key type, > > >>>> * input/output/KeyValueStore value type. > > >>>> > > >>>> However, it seems that all these types can be inferred from the > > provided > > >>>> lambda and Transformed.with instances. > > >>>> > > >>>> What do you think about this? > > >>>> > > >>>> Regards, > > >>>> > > >>>> Ivan > > >>>> > > >>>> > > >>>> 27.03.2019 20:45, Guozhang Wang пишет: > > >>>> > > >>>> Hello Paul, > > >>>> > > >>>> Thanks for the uploaded PR and the detailed description! I've made a > > >> pass > > >>>> on it and left some comments. > > >>>> > > >>>> Overall I think I agree with you that passing in the storebuilder > > >> directly > > >>>> that store name is more convienent as it does not require another > > >>>> `addStore` call, but we just need to spend some more documentation > > >> effort > > >>>> on educating users about the two ways of connecting their stores. > I'm > > >>>> slightly concerned about this education curve but I can be convinced > > if > > >>>> most people felt it is worthy. > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> < > > >> pgwha...@gmail.com> wrote: > > >>>> > > >>>> > > >>>> I'd like to resurrect this discussion with a cursory, > proof-of-concept > > >>>> implementation of the KIP which combines many of our ideas: > > >> https://github.com/apache/kafka/pull/6496. I tried to keep the diff > as > > >>>> small as possible for now, just using it to convey the main ideas. > > But > > >>>> I'll separately address some of our earlier discussion: > > >>>> > > >>>> - Will there be a new, separate interface for users to implement > > for > > >> the > > >>>> new functionality? No, to hopefully keep things simple, all of > the > > >>>> Processor/TransformerSupplier interfaces will just extend > > >>>> StateStoresSupplier, allowing users to opt in to this > functionality > > >> by > > >>>> overriding the default implementation that gives an empty list. > > >>>> - Will the interface allow users to specify the store name, or > the > > >>>> entire StoreBuilder? The entire StoreBuilder, so the > > >>>> Processor/TransformerSupplier can completely encapsulate name and > > >>>> implementation of a state store if desired. > > >>>> - Will the old way of specifying store names alongside the > supplier > > >> when > > >>>> calling stream.process/transform() be deprecated? No, this is > > still a > > >>>> legitimate way to wire up Processors/Transformers and their > stores. > > >> But > > >>>> I > > >>>> would recommend not allowing stream.process/transform() calls > that > > >> use > > >>>> both > > >>>> store declaration mechanisms (this restriction is not in the > proof > > of > > >>>> concept) > > >>>> - How will we handle adding the same state store to the topology > > >>>> multiple times because different Processor/TransformerSuppliers > > >> declare > > >>>> it? > > >>>> topology.addStateStore() will be slightly relaxed for > convenience, > > >> and > > >>>> will > > >>>> allow adding the same StoreBuilder multiple times as long as the > > >> exact > > >>>> same > > >>>> StoreBuilder instance is being added for the same store name. > This > > >>>> seems > > >>>> to prevent in practice the issue of accidentally making two state > > >> stores > > >>>> one by adding with the same name. For additional safety, if we > > >> wanted > > >>>> to > > >>>> (not in the proof of concept), we could allow for this relaxation > > >> only > > >>>> for > > >>>> internal callers of topology.addStateStore(). > > >>>> > > >>>> So, in summary, the use cases look like: > > >>>> > > >>>> - 1 transformer/processor that owns its store: Using the new > > >>>> StateStoresSupplier interface method to supply its StoreBuilders > > that > > >>>> will > > >>>> be added to the topology automatically. > > >>>> - Multiple transformer/processors that share the same store: > Either > > >>>> > > >>>> > > >>>> 1. The old way: the StoreBuilder is defined "far away" from the > > >>>> Transformer/Processor implementations, and is added to the > topology > > >>>> manually by the user > > >>>> 2. The new way: the StoreBuilder is defined closer to the > > >>>> Transformer/Processor implementations, and the same instance is > > >>>> returned by > > >>>> all Transformer/ProcessorSuppliers that need it > > >>>> > > >>>> > > >>>> This makes the KIP wiki a bit stale; I'll update if we want to bring > > >> this > > >>>> design to a vote. > > >>>> > > >>>> Thanks! > > >>>> Paul > > >>>> > > >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> > < > > >> wangg...@gmail.com> wrote: > > >>>> > > >>>> > > >>>> Matthias / Paul, > > >>>> > > >>>> The concern I had about introducing `StoreBuilderSupplier` is simply > > >>>> because it is another XXSupplier to the public API, so I'd like to > ask > > >> if > > >>>> we really have to add it :) > > >>>> > > >>>> The difference between encapsulating the store name and > encapsulating > > >> the > > >>>> full state store builder is that, in the former: > > >>>> > > >>>> ----------- > > >>>> > > >>>> String storeName = "store1"; > > >>>> builder.addStore(new MyStoreBuilder(storeName)); > > >>>> stream1.transform(new MyTransformerSupplier(storeName)); // > > following > > >>>> > > >>>> my > > >>>> > > >>>> proposal, that the store name can be passed in and used for both > > >>>> `listStores` and in the `Transformer#init`; so the Transformer > > function > > >>>> does not need to get the constant string name again. > > >>>> > > >>>> // one caveat to admit, is that > > >>>> MyTransofmerSupplier logic may be just unique to `store1` so it > cannot > > >> be > > >>>> reused with a different store name anyways. > > >>>> ----------- > > >>>> > > >>>> While in the latter: > > >>>> > > >>>> ----------- > > >>>> > > >>>> stream1.transform(new MyTransformerSupplierForStore1); // the name > > is > > >>>> just indicating that we may have one such supplier for each store. > > >>>> > > >>>> ----------- > > >>>> > > >>>> I understand the latter introduce more convenience from the API, but > > the > > >>>> cost is that since we still cannot completely `builder.addStore`, > but > > >>>> > > >>>> only > > >>>> > > >>>> reduce its semantic scope to shared state stores only,; hence users > > need > > >>>> > > >>>> to > > >>>> > > >>>> learn two ways of creating state stores for those two patterns. > > >>>> > > >>>> My argument is that more public APIs requires longer learning curve > > for > > >>>> users, and introduces more usage patterns that may confuse users > (the > > >>>> proposal I had tries to replace one with another completely). > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> < > > >> pgwha...@gmail.com> wrote: > > >>>> > > >>>> > > >>>> Thanks for the great thoughts Matthias and Guozhang! > > >>>> > > >>>> If I'm not mistaken, Guozhang's suggestion is what my second > > >>>> > > >>>> alternative > > >>>> > > >>>> on > > >>>> > > >>>> the KIP is ("Have the added method on the Supplier interfaces only > > >>>> > > >>>> return > > >>>> > > >>>> store names, not builders"). I do think it would be a worthwhile > > >>>> > > >>>> usability > > >>>> > > >>>> improvement on its own, but to Matthias's point, it doesn't achieve > > the > > >>>> full goal of completing encapsulating a state store and it's > processor > > >>>> > > >>>> - > > >>>> > > >>>> it > > >>>> > > >>>> encapsulates the name, but not the StateStoreBuilder. > > >>>> > > >>>> I'm really intrigued by Matthias's idea that forgoes the default > > >>>> > > >>>> interface > > >>>> > > >>>> method I proposed. Having smaller, separate interfaces is a > powerful > > >>>> > > >>>> idea > > >>>> > > >>>> and I think a cleaner API than what I proposed. The non-shared > store > > >>>> > > >>>> use > > >>>> > > >>>> case is handled well here, and the shared store use case is > possible, > > >>>> though maybe still not as graceful as we would like (having to add > the > > >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too > > >>>> > > >>>> subtle > > >>>> > > >>>> to > > >>>> > > >>>> me). > > >>>> > > >>>> We're all agreed that one of the big problems with the shared store > > use > > >>>> case is how to deal with adding the same store to the topology > > multiple > > >>>> times. Catching the "store already added" exception is risky. > Here's > > >>>> > > >>>> a > > >>>> > > >>>> maybe radical idea: change `topology.addStateStore()` to be > idempotent > > >>>> > > >>>> for > > >>>> > > >>>> adding a given state store name and `StoreBuilder`. In other words, > > >>>> `addStateStore` would not throw the "store already added" exception > if > > >>>> > > >>>> the > > >>>> > > >>>> `StoreBuilder` being added for a given name has the same identity as > > >>>> > > >>>> the > > >>>> > > >>>> one that has already been added. Does this eliminate all the bugs > > >>>> > > >>>> we're > > >>>> > > >>>> worried about? Thinking about it for a few minutes, it seems to > > >>>> > > >>>> eliminate > > >>>> > > >>>> most at least (would a user really use the exact same StoreBuilder > > when > > >>>> they intend there to be two stores?). It might make the API > slightly > > >>>> harder to use if a user isn't immediately aware of that subtlety, > but > > a > > >>>> good error message should ease the pain, and it would happen > > >>>> > > >>>> immediately > > >>>> > > >>>> during development. > > >>>> > > >>>> And with regards to Matthias's comment about whether we need to > > >>>> > > >>>> deprecate > > >>>> > > >>>> existing varargs transform methods - I don't think we need to, but > it > > >>>> > > >>>> might > > >>>> > > >>>> be nice for there only to be one way to do things, assuming whatever > > we > > >>>> come up with supports all existing use cases. I don't feel strongly > > >>>> > > >>>> about > > >>>> > > >>>> this, but if we don't deprecate, I do think it's important to add > > >>>> > > >>>> checks > > >>>> > > >>>> that prevent users from trying to do the same thing in two different > > >>>> > > >>>> ways, > > >>>> > > >>>> as we've discussed. > > >>>> > > >>>> Paul > > >>>> > > >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < > > matth...@confluent.io > > >>>> > > >>>> wrote: > > >>>> > > >>>> > > >>>> Guozhang, > > >>>> > > >>>> > > >>>> Regarding the last option to catch "store exist already" exception > > >>>> > > >>>> and > > >>>> > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > > >>>> > > >>>> actual > > >>>> > > >>>> user bugs. > > >>>> > > >>>> I agree with this concern. From my original email: > > >>>> > > >>>> > > >>>> The only disadvantage I see, might be > > >>>> potential bugs about sharing state if two different stores are > > >>>> > > >>>> named > > >>>> > > >>>> the > > >>>> > > >>>> same by mistake (this would not be detected). > > >>>> > > >>>> For your new proposal: I am not sure if it addresses Paul's original > > >>>> idea -- I hope Paul can clarify. From my understanding, the idea was > > >>>> > > >>>> to > > >>>> > > >>>> encapsulate a store and its processor. As many stores are not > shared, > > >>>> this seems to be quite useful. Your proposal falls a little short to > > >>>> support encapsulation for none-shared stores. > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> > > >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: > > >>>> > > >>>> Matthias, > > >>>> > > >>>> Thanks for your feedbacks. > > >>>> > > >>>> Regarding the last option to catch "store exist already" exception > > >>>> > > >>>> and > > >>>> > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > > >>>> > > >>>> actual > > >>>> > > >>>> user bugs. > > >>>> > > >>>> Thinking about Paul's proposal and your suggestion again, I'd like > > >>>> > > >>>> to > > >>>> > > >>>> propose another alternative somewhere in the middle of your > > >>>> > > >>>> approaches, > > >>>> > > >>>> i.e. we still let users to create sharable state stores via > > >>>> `addStateStore`, and we allow the TransformerSupplier to return a > > >>>> > > >>>> list > > >>>> > > >>>> of > > >>>> > > >>>> state stores that it needs, i.e.: > > >>>> > > >>>> public interface TransformerSupplier<K, V, R> { > > >>>> Transformer<K, V, R> get(); > > >>>> default List<String> stateStoreNames() { > > >>>> return Collections.emptyList(); > > >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > > >>>> > > >>>> ;> > > >>>> > > >>>> } > > >>>> } > > >>>> > > >>>> by doing this users can still "consolidate" the references of store > > >>>> > > >>>> names > > >>>> > > >>>> in a single place in the transform call, e.g.: > > >>>> > > >>>> public class MyTransformerSupplier<K, V, R> { > > >>>> private String storeName; > > >>>> > > >>>> public class MyTransformer<K, V, R> { > > >>>> > > >>>> .... > > >>>> > > >>>> init() { > > >>>> store = context.getStateStore(storeName); > > >>>> } > > >>>> } > > >>>> > > >>>> default List<String> stateStoreNames() { > > >>>> return Collections.singletonList(storeName); > > >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() > > >>>> > > >>>> ;> > > >>>> > > >>>> } > > >>>> } > > >>>> > > >>>> Basically, we move the parameters from the caller of `transform` to > > >>>> > > >>>> inside > > >>>> > > >>>> the TransformSuppliers. DSL implementations would not change much, > > >>>> > > >>>> simply > > >>>> > > >>>> calling `connectStateStore` by getting the list of names from the > > >>>> > > >>>> provided > > >>>> > > >>>> function. > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < > > >>>> > > >>>> matth...@confluent.io > > >>>> > > >>>> wrote: > > >>>> > > >>>> > > >>>> Just a meta comment: do we really need to deprecate existing > > >>>> `transform()` etc methods? > > >>>> > > >>>> The last argument is a vararg, and thus, just keeping the existing > > >>>> > > >>>> API > > >>>> > > >>>> for this part seems to work too, allowing to implement both > > >>>> > > >>>> patterns? > > >>>> > > >>>> Also, instead of adding a default method, we could also add a new > > >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > > >>>> stateStores()` -- users could implement `TransformerSupplier` and > > >>>> `StoreBuilderSupplier` at once; and for this case, we require that > > >>>> > > >>>> users > > >>>> > > >>>> don't provide store name in `transform()`. > > >>>> > > >>>> Similar, we could add an interface `StoreNameSupplier` with method > > >>>> `List<String> stateStores()`. This allows to "auto-wire" a > > >>>> > > >>>> transformer > > >>>> > > >>>> to existing stores (to avoid the issue to add the same store > > >>>> > > >>>> multiple > > >>>> > > >>>> times). > > >>>> > > >>>> Hence, for shared stores, there would be one "main" transformer > > >>>> > > >>>> that > > >>>> > > >>>> implements `StoreBuilderSupplier` and that must be added first to > > >>>> > > >>>> the > > >>>> > > >>>> topology. The other transformers would implement > > >>>> > > >>>> `StoreNameSupplier` > > >>>> > > >>>> and > > >>>> > > >>>> just connect to those stores. > > >>>> > > >>>> Another possibility to avoid the issue of adding the same stores > > >>>> multiple times would be, that the DSL always calls > > >>>> > > >>>> `addStateStore()` > > >>>> > > >>>> but > > >>>> > > >>>> catches a potential "store exists already" exception and falls > > >>>> > > >>>> back > > >>>> > > >>>> to > > >>>> > > >>>> `connectProcessorAndStateStore()` for this case. Thus, we would > > >>>> > > >>>> not > > >>>> > > >>>> need > > >>>> > > >>>> the `StoreNameSupplier` interface and the order in which > > >>>> > > >>>> transformers > > >>>> > > >>>> are added would not matter either. The only disadvantage I see, > > >>>> > > >>>> might > > >>>> > > >>>> be > > >>>> > > >>>> potential bugs about sharing state if two different stores are > > >>>> > > >>>> named > > >>>> > > >>>> the > > >>>> > > >>>> same by mistake (this would not be detected). > > >>>> > > >>>> > > >>>> > > >>>> Just some ideas I wanted to share. What do you think? > > >>>> > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> On 12/11/18 3:46 AM, Paul Whalen wrote: > > >>>> > > >>>> Ah yes of course, this was an oversight, I completely ignored the > > >>>> > > >>>> multiple > > >>>> > > >>>> processors sharing the same state store when writing up the KIP. > > >>>> > > >>>> Which > > >>>> > > >>>> is > > >>>> > > >>>> funny, because I've actually done this (different processors > > >>>> > > >>>> sharing > > >>>> > > >>>> state > > >>>> > > >>>> stores) a fair amount myself, and I've settled on a pattern > > >>>> > > >>>> where I > > >>>> > > >>>> group > > >>>> > > >>>> the Processors in an enclosing class, and that enclosing class > > >>>> > > >>>> handles > > >>>> > > >>>> as > > >>>> > > >>>> much as possible. Here's a gist showing the rough structure, > > >>>> > > >>>> just > > >>>> > > >>>> for > > >>>> > > >>>> context: > > >>>> > > >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > > >>>> > > >>>> . Note how it adds the stores to the topology, as well as > > >>>> > > >>>> providing a > > >>>> > > >>>> public method with the store names. > > >>>> > > >>>> I don't think my proposal completely conflicts with the multiple > > >>>> > > >>>> processors > > >>>> > > >>>> sharing state stores use case, since you can create a supplier > > >>>> > > >>>> that > > >>>> > > >>>> provides the store name you want, somewhat independently of your > > >>>> > > >>>> actual > > >>>> > > >>>> Processor logic. The issue I do see though, is that > > >>>> topology.addStateStore() can only be called once for a given > > >>>> > > >>>> store. > > >>>> > > >>>> So > > >>>> > > >>>> for > > >>>> > > >>>> your example, if the there was a single TransformerSupplier that > > >>>> > > >>>> was > > >>>> > > >>>> passed > > >>>> > > >>>> into both transform() calls, "store1" would be added (under the > > >>>> > > >>>> hood) > > >>>> > > >>>> to > > >>>> > > >>>> the topology twice, which is no good. > > >>>> > > >>>> Perhaps this suggests that one of my alternatives on the KIP > > >>>> > > >>>> might > > >>>> > > >>>> be > > >>>> > > >>>> desirable: either not having the suppliers return StoreBuilders > > >>>> > > >>>> (just > > >>>> > > >>>> store > > >>>> > > >>>> names), or not deprecating the old methods that take "String... > > >>>> stateStoreNames". I'll have to think about it a bit. > > >>>> > > >>>> Paul > > >>>> > > >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < > > >>>> > > >>>> wangg...@gmail.com> > > >>>> > > >>>> wrote: > > >>>> > > >>>> Hello Paul, > > >>>> > > >>>> Thanks for the great writeup (very detailed and crystal > > >>>> > > >>>> motivation > > >>>> > > >>>> sections!). > > >>>> > > >>>> This is quite an interesting idea and I do like the API > > >>>> > > >>>> cleanness > > >>>> > > >>>> you > > >>>> > > >>>> proposed. The original motivation of letting StreamsTopology to > > >>>> > > >>>> add > > >>>> > > >>>> state > > >>>> > > >>>> stores though, is to allow different processors to share the > > >>>> > > >>>> state > > >>>> > > >>>> store. > > >>>> > > >>>> For example: > > >>>> > > >>>> builder.addStore("store1"); > > >>>> > > >>>> // a path of stream transformations that leads to KStream > > >>>> > > >>>> stream1. > > >>>> > > >>>> stream1.transform(..., "store1"); > > >>>> > > >>>> // another path that generates a KStream stream2. > > >>>> stream2.transform(..., "store1"); > > >>>> > > >>>> Behind the scene, Streams will make sure stream1 / stream2 > > >>>> > > >>>> transformations > > >>>> > > >>>> will always be grouped together as a single group of tasks, each > > >>>> > > >>>> of > > >>>> > > >>>> which > > >>>> > > >>>> will be executed by a single thread and hence there's no > > >>>> > > >>>> concurrency > > >>>> > > >>>> issues > > >>>> > > >>>> on accessing the store from different operators within the same > > >>>> > > >>>> task. > > >>>> > > >>>> I'm > > >>>> > > >>>> not sure how common this use case is, but I'd like to hear if > > >>>> > > >>>> you > > >>>> > > >>>> have > > >>>> > > >>>> any > > >>>> > > >>>> thoughts maintaining this since the current proposal seems > > >>>> > > >>>> exclude > > >>>> > > >>>> this > > >>>> > > >>>> possibility. > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> < > > >> pgwha...@gmail.com> > > >>>> > > >>>> wrote: > > >>>> > > >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change > > >>>> > > >>>> that > > >>>> > > >>>> I > > >>>> > > >>>> think could greatly increase the usability of the low-level > > >>>> > > >>>> processor > > >>>> > > >>>> API. > > >>>> > > >>>> I have some code written but will wait to see if there is buy > > >>>> > > >>>> in > > >>>> > > >>>> before > > >>>> > > >>>> going all out and creating a pull request. It seems like most > > >>>> > > >>>> of > > >>>> > > >>>> the > > >>>> > > >>>> work > > >>>> > > >>>> would be in updating documentation and tests. > > >>>> > > >>>> > > >>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > > >>>> > > >>>> Thanks! > > >>>> Paul > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > > > > > > >