Hi Paul, I will try to express myself a bit clearer.
Ad 1) My assumption is that if `StateStoreConnector#stateStores()` returns `null` Kafka Streams will throw an NPE because on purpose no null check is performed before the loop that calls `StreamsBuilder#addStateStore()`. When the user finally understands the cause of the NPE, she knows that she has to override `StateStoreConnector#stateStores()` in her implementation. My question was, why let the user discover that she has to overwrite the method at runtime if you could not provide a default implementation for `StateStoreConnector#stateStores()` and let the compiler tell the user the need to overwrite the method. Not providing a default implementation without separating the interfaces implies not being backward-compatible. That means, if we choose to not provide a default implementation and let the compiler signal the necessity to override the method, we have to separate the interfaces in any case. Ad 2) If you check for `null` or empty list in `process` and do not call `addStateStores` in those cases, the advantage of returning `null` to be saver to detect bugs as mentioned by Matthias would be lost. But maybe I am missing something here. Best, Bruno On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> wrote: > I definitely don't mind anyone jumping, Bruno, thanks for the comments! > > 1) I'm not totally sure I'm clear on your point, but I think we're on the > same page - if we're adding a method to the XSupplier interfaces (by making > them inherit from a super interface StateStoreConnector) then we definitely > need a default implementation to maintain compatibility. Whether the > default implementation returns null or an empty list is somewhat of a > detail. > > 2) If stream.process() sees that StateStoreConnector#stateStores() returns > either null or an empty list, it would handle that case specifically and > not try to call addStateStore at all. Or is this not what you're asking? > > Separately, I'm still hacking away at the details of the PR and will > continue to get something into a discussable state, but I'll share some > thoughts I've run into. > > A) I'm tentatively going the separate interface route (Matthias's > suggestion) and naming it ConnectedStoreProvider. Still don't love the > name, but there's something nice about the name indicating *why* this thing > is providing the store, not just that it is providing it. > > B) It has occurred to me that topology.addProcessor() could also recognize > if ProcessorSupplier implements ConnectedStoreProvider and add and connect > stores appropriately. This isn't in the KIP and I think the value-add is > lower (if you're reaching that low level, surely the "auto add/connect > store" isn't too important to you), but I think it would be a confusing if > it didn't, and I don't see any real downside. > > Paul > > On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io> wrote: > > > Hi, > > > > @Paul: Thank you for the KIP! > > > > I hope you do not mind that I jump in. > > > > I have the following comments: > > > > 1) `null` vs empty list in the default implementation > > IIUC, returning `null` in the default implementation should basically > > signal that the method `stateStores` was not overridden. Why then > provide a > > default implementation in the first place? Without default implementation > > you would discover the missing implementation already at compile-time and > > not only at runtime. If you decide not to provide a default > implementation, > > `XSupplier extends StateStoreConnector` would break existing code as > > Matthias has already pointed out. > > > > 2) `process` method adding the StoreBuilders to the topology > > If the default implementation returned `null` and `XSupplier extends > > StateStoreConnector`, then existing code would break, because > > `StreamsBuilder#addStateStore()` would throw a NPE. > > > > +1 for opening a WIP PR > > > > Best, > > Bruno > > > > > > On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thank Paul! > > > > > > I agree with all of that. If we think that the general design is good, > > > refactoring a PR if we want to pick a different name should not be too > > > much additional work (hopefully). Thus, if you want to open a WIP PR > and > > > we use it to nail the open details, it might help to find a good > > > conclusion. > > > > > > >> 2) Default method vs new interface: > > > > > > This seems to be the hardest tradeoff. I see the point about > > > discoveability... Might be good to get input from others, which version > > > they would prefer. > > > > > > Just to make clear, my suggestion from the last email was, that > > > `Transformer` etc does not extend the new interface. Instead, a user > > > that want to use this feature would need to implement both interfaces. > > > > > > If `Transformer extends StoreProvider` (just picking a name here) > > > without default implementation existing code would break and thus it > not > > > a an option because of breaking backward compatibility. > > > > > > > > > -Matthias > > > > > > On 4/28/19 8:37 PM, Paul Whalen wrote: > > > > Great thoughts Matthias, thanks! I think we're all agreed that naming > > and > > > > documentation/education are the biggest hurdles for this KIP, and in > > > light > > > > of that, I think it makes sense for me to just take a stab at a full > > > > fledged PR with documentation to convince us that it's possible to do > > it > > > > with enough clarity. > > > > > > > > In response to your specific thoughts: > > > > > > > > 1) StateStoreConnector as a name: Really good point about defining > the > > > > difference between "adding" and "connecting." Guozhang suggested > > > > StateStoreConnector which was definitely an improvement over my > > > > StateStoresSupplier, but I think you're right that we need to be > > careful > > > to > > > > make it clear that it's really accomplishing both. Thinking about it > > > now, > > > > one problem with Connector is that the implementer of the interface > is > > > not > > > > really doing any connecting, it's providing/supplying the store that > > will > > > > be both added and connected. StoreProvider seems reasonable to me > and > > > > probably the best candidate at the moment, but it would be nice if > the > > > name > > > > could convey that it's providing the store specifically so the caller > > can > > > > add it to the topology and connect it to the associated transformer. > > > > > > > > In general I think that really calling out what "adding" versus > > > > "connecting" is in the documentation will help make the entire > purpose > > of > > > > this feature more clear to the user. > > > > > > > > 2) Default method vs new interface: The choice of a default method > was > > > > influenced by Guozhang's fear about API bloat/discoverability. I can > > > > definitely see it both ways Would the separate interface be a > > > > sub-interface of Processor/TransformerSupplier or standalone? It > seems > > > > like you're suggesting standalone and I think that's what I favor. > My > > > only > > > > concern there is that the interface wouldn't actually be a type to > any > > > > public API which sort of hurts discoverability. You would have to > read > > > the > > > > javadocs for stream.process/transform() to discover that implementing > > the > > > > interface in addition to Processor/TransformerSupplier would add and > > > > connect the store for you. But that added burden actually probably > > helps > > > > us in terms of making sure people don't mix and match, like you said. > > > > > > > > 3) Returning null instead of empty: Seems fair to me. I always worry > > > about > > > > returning null when an empty collection can be used instead, but > given > > > that > > > > the library is the caller rather than the client I think your point > > makes > > > > sense here. > > > > > > > > 4) Returning Set instead of Collection: Agreed, don't see why not to > > make > > > > it more specific. > > > > > > > > Paul > > > > > > > > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> Hi, sorry for the long pause. Just trying to catch up here. > > > >> > > > >> I think it save to allow `addStateStore()` to be idempotent for the > > same > > > >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus > > it's > > > >> not really possible to use the same `StoreBuilder` object to create > > > >> different stores. > > > >> > > > >> I also agree with the concern, that only allowing a single store (as > > > >> proposed by Ivan) might be too restrictive. > > > >> > > > >> Overall, the current KIP version LGTM. I don't have mayor concerns > > about > > > >> user education for this case, but I agree that we need to document > > this > > > >> clearly. > > > >> > > > >> Some further comments: > > > >> > > > >> (1) I am not sure if `StateStoreConnector` is the best name for the > > new > > > >> interface. Note, that there are two concepts about stores: > > > >> > > > >> - adding a store: this makes the store available in the topology in > > > >> general (however, the store is still "dangling", and not used) > > > >> - connecting a store: this allows a processor etc to use a store > > > >> > > > >> The new interface does both, but its name only indicates that second > > > >> part what might be confusing. It might be especially confusing > because > > > >> we want to disallow to mix the exiting "manually add and connect" > > > >> pattern, with a new pattern to "auto add+connect". If the new > > interface > > > >> name indicates the connect part only, user might think they need to > > add > > > >> stores manually and can connect automatically. > > > >> > > > >> Unfortunately, I don't have a much better suggestion for a name > > either. > > > >> The only idea that came to my mind was `StoreProvider`: to me, a > > > >> provider is a "service" interface that does work for us, ie, it adds > > and > > > >> connects a store. Not sure if this is too subtle, if we consider > that > > > >> there is already the `StoreSupplier` interface? > > > >> > > > >> But maybe somebody else might still have a good idea on how the > > improve > > > >> the name. > > > >> > > > >> In any case, I would suggest to shorten the name to `StoreConnector` > > > >> instead of `StateStoreConnector`, because we also have > `StoreSupplier` > > > >> and `StoreBuilder`. > > > >> > > > >> > > > >> > > > >> (2) The KIP proposes to add the new interface to `ProcessorSupplier` > > etc > > > >> and to add a default implementation for the new method. Hence, user > > > >> would need to overwrite this default implementation to op-in to the > > > >> feature. I am wonder if it might be better to not add the new > > interface > > > >> to `ProcessorSupplier` etc and to just provide a new interface with > no > > > >> default implementation. Users would opt-in by adding the interface > > > >> explicitly to their existing `ProcessorSupplier` implementation. > > > >> Overwriting a default method and getting different behavior seems to > > be > > > >> a little subtle to me, especially, because we don't want to allow to > > > >> mix-and-match the old and new approaches. Think: I only overwrite a > > > >> default method and my code breaks. > > > >> > > > >> Thoughts? > > > >> > > > >> > > > >> > > > >> (3) If we keep the current default implementation for the new > method, > > I > > > >> am wondering if it should return `null` instead of an empty > > collection? > > > >> This might be saver to detect bugs in user code for which, per > > accident, > > > >> an empty collection could be returned. > > > >> > > > >> > > > >> > > > >> (4) Should the new method return a `Set` instead of a `Collection` > to > > > >> indicate the semantics clearly (ie, returning the same > `StoreBuilder` > > > >> multiple times is idempotent and one cannot add+connect to it > twice). > > > >> > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> > > > >> > > > >> > > > >> On 4/6/19 12:27 PM, Paul Whalen wrote: > > > >>> Ivan and Guozhang, > > > >>> > > > >>> Thanks for the thoughts! Ivan's use case is definitely > interesting. > > > The > > > >>> way I see it, if we can achieve the main goal of the KIP (allowing > > > >>> Processor/TransformerSuppliers to encapsulate their usage of state > > > >> stores), > > > >>> we will enable this kind of thing in "user space" very easily. > > > >>> > > > >>> I will say that I'm not totally sure that most use cases of > > transform() > > > >> use > > > >>> just one state store. It's hard to know since I haven't seen many > > > >> examples > > > >>> in public, but my team's usages almost exclusively require multiple > > > state > > > >>> stores. We only reach for the low level processor API when we need > > > that > > > >>> complexity, and it's somewhat hard to imagine many use cases that > > only > > > >> need > > > >>> one state store, since the high level DSL can usually accomplish > > those > > > >>> tasks. The example Ivan presented for instance looks like a > > > >>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious what > > sort > > > of > > > >>> other usages you're imagining. > > > >>> > > > >>> That being said, perhaps the Processor API should really just be > > > >> considered > > > >>> a separate paradigm in Streams, not just a lower level that we > reach > > to > > > >>> when necessary. In which case it would be beneficial to make the > > > simple > > > >>> use cases easier. I've definitely talked about this with my own > > team - > > > >> if > > > >>> you're less familiar with the kind of functional style that the > high > > > >> level > > > >>> DSL offers, it might be easier to "see" your state and interact > with > > it > > > >>> directly. > > > >>> > > > >>> Anyway, I've updated the KIP to reflect my current PR with > Guozhang's > > > >>> suggestions. It seems like there is at least some interest in that > > on > > > >> its > > > >>> own and not a ton of pushback, so I think I will try to start a > vote. > > > >>> > > > >>> Paul > > > >>> > > > >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev < > iponoma...@mail.ru> > > > >> wrote: > > > >>> > > > >>>> Hi all! > > > >>>> > > > >>>> I was about to write another KIP, but found out that KIP-401 > > addresses > > > >>>> exactly the problem I faced. So let me jump into your discussion > and > > > ask > > > >>>> you to assess another idea. > > > >>>> > > > >>>> I fully agree with the KIP-401's motivation part. E. g in my > > project I > > > >> had > > > >>>> to invent a wrapper class that hides the details of KeyValueStore > > > >>>> management from business logic. Of course this should be done > better > > > in > > > >>>> KStreams API. > > > >>>> > > > >>>> But I was about to look at this problem from another side and > > propose > > > a > > > >>>> simple alternative in high-level DSL, that will not fit all the > > cases, > > > >> but > > > >>>> most of them. Hence my idea does not exclude the Paul's proposal. > > > >>>> > > > >>>> What if we restrict ourselves to *only one* KeyValueStore and > > propose > > > a > > > >>>> method that resembles `aggregate` and `reduce` methods, like > this: > > > >>>> > > > >>>> stream > > > >>>> .map(...) > > > >>>> .filter(...) > > > >>>> .transform ((k, v, s)->{....}, Transformed.with(....)) > > > >>>> > > > >>>> where > > > >>>> * k, v -- input key & value > > > >>>> * s -- a KeyValueStore provided as an argument > > > >>>> * return value of the lambda should be KeyValue.pair(...) > > > >>>> * Transformed.with... is a builder, used in order to define the > > > >>>> Transformer and KeyValueStore building parameters. Some of these > > > >> parameters > > > >>>> should be: > > > >>>> ** store's KeySerde, > > > >>>> ** store's ValueSerde, > > > >>>> ** whether the store is persistent or in-memory, > > > >>>> ** store's name -- optional parameter, the system should be able > to > > > >> devise > > > >>>> the name of the store transparently for the user, if we don't want > > to > > > >>>> devise it ourselves/share the store between processors. > > > >>>> ** scheduled punctuation. > > > >>>> > > > >>>> Imagine we have a KStream<String, Integer>, and we need to > > calculate a > > > >>>> `derivative` stream, that is, a stream of 'deltas' of the provided > > > >> integer > > > >>>> values. > > > >>>> > > > >>>> This could be achieved as simple as > > > >>>> > > > >>>> stream.transform((key, value, stateStore) -> { > > > >>>> int previousValue = > > > >>>> Optional.ofNullable(stateStore.get(key)).orElse(0); > > > >>>> stateStore.put(key, value); > > > >>>> return KeyValue.pair(key, value - previousValue); > > > >>>> } > > > >>>> //we do not need to bother with store name, punctuation > etc. > > > >>>> //may be even Serde part can be omitted, since we can > > inherit > > > >> the > > > >>>> serdes from stream by default > > > >>>> , Transformed.with(Serdes.String(), Serdes.Integer()) > > > >>>> } > > > >>>> > > > >>>> The hard part of it is that new `transform` method definition > should > > > be > > > >>>> parameterized by six type parameters: > > > >>>> > > > >>>> * input/output/KeyValueStore key type, > > > >>>> * input/output/KeyValueStore value type. > > > >>>> > > > >>>> However, it seems that all these types can be inferred from the > > > provided > > > >>>> lambda and Transformed.with instances. > > > >>>> > > > >>>> What do you think about this? > > > >>>> > > > >>>> Regards, > > > >>>> > > > >>>> Ivan > > > >>>> > > > >>>> > > > >>>> 27.03.2019 20:45, Guozhang Wang пишет: > > > >>>> > > > >>>> Hello Paul, > > > >>>> > > > >>>> Thanks for the uploaded PR and the detailed description! I've > made a > > > >> pass > > > >>>> on it and left some comments. > > > >>>> > > > >>>> Overall I think I agree with you that passing in the storebuilder > > > >> directly > > > >>>> that store name is more convienent as it does not require another > > > >>>> `addStore` call, but we just need to spend some more documentation > > > >> effort > > > >>>> on educating users about the two ways of connecting their stores. > > I'm > > > >>>> slightly concerned about this education curve but I can be > convinced > > > if > > > >>>> most people felt it is worthy. > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> > < > > > >> pgwha...@gmail.com> wrote: > > > >>>> > > > >>>> > > > >>>> I'd like to resurrect this discussion with a cursory, > > proof-of-concept > > > >>>> implementation of the KIP which combines many of our ideas: > > > >> https://github.com/apache/kafka/pull/6496. I tried to keep the > diff > > as > > > >>>> small as possible for now, just using it to convey the main ideas. > > > But > > > >>>> I'll separately address some of our earlier discussion: > > > >>>> > > > >>>> - Will there be a new, separate interface for users to > implement > > > for > > > >> the > > > >>>> new functionality? No, to hopefully keep things simple, all of > > the > > > >>>> Processor/TransformerSupplier interfaces will just extend > > > >>>> StateStoresSupplier, allowing users to opt in to this > > functionality > > > >> by > > > >>>> overriding the default implementation that gives an empty list. > > > >>>> - Will the interface allow users to specify the store name, or > > the > > > >>>> entire StoreBuilder? The entire StoreBuilder, so the > > > >>>> Processor/TransformerSupplier can completely encapsulate name > and > > > >>>> implementation of a state store if desired. > > > >>>> - Will the old way of specifying store names alongside the > > supplier > > > >> when > > > >>>> calling stream.process/transform() be deprecated? No, this is > > > still a > > > >>>> legitimate way to wire up Processors/Transformers and their > > stores. > > > >> But > > > >>>> I > > > >>>> would recommend not allowing stream.process/transform() calls > > that > > > >> use > > > >>>> both > > > >>>> store declaration mechanisms (this restriction is not in the > > proof > > > of > > > >>>> concept) > > > >>>> - How will we handle adding the same state store to the > topology > > > >>>> multiple times because different Processor/TransformerSuppliers > > > >> declare > > > >>>> it? > > > >>>> topology.addStateStore() will be slightly relaxed for > > convenience, > > > >> and > > > >>>> will > > > >>>> allow adding the same StoreBuilder multiple times as long as > the > > > >> exact > > > >>>> same > > > >>>> StoreBuilder instance is being added for the same store name. > > This > > > >>>> seems > > > >>>> to prevent in practice the issue of accidentally making two > state > > > >> stores > > > >>>> one by adding with the same name. For additional safety, if we > > > >> wanted > > > >>>> to > > > >>>> (not in the proof of concept), we could allow for this > relaxation > > > >> only > > > >>>> for > > > >>>> internal callers of topology.addStateStore(). > > > >>>> > > > >>>> So, in summary, the use cases look like: > > > >>>> > > > >>>> - 1 transformer/processor that owns its store: Using the new > > > >>>> StateStoresSupplier interface method to supply its > StoreBuilders > > > that > > > >>>> will > > > >>>> be added to the topology automatically. > > > >>>> - Multiple transformer/processors that share the same store: > > Either > > > >>>> > > > >>>> > > > >>>> 1. The old way: the StoreBuilder is defined "far away" from the > > > >>>> Transformer/Processor implementations, and is added to the > > topology > > > >>>> manually by the user > > > >>>> 2. The new way: the StoreBuilder is defined closer to the > > > >>>> Transformer/Processor implementations, and the same instance is > > > >>>> returned by > > > >>>> all Transformer/ProcessorSuppliers that need it > > > >>>> > > > >>>> > > > >>>> This makes the KIP wiki a bit stale; I'll update if we want to > bring > > > >> this > > > >>>> design to a vote. > > > >>>> > > > >>>> Thanks! > > > >>>> Paul > > > >>>> > > > >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com > > > > < > > > >> wangg...@gmail.com> wrote: > > > >>>> > > > >>>> > > > >>>> Matthias / Paul, > > > >>>> > > > >>>> The concern I had about introducing `StoreBuilderSupplier` is > simply > > > >>>> because it is another XXSupplier to the public API, so I'd like to > > ask > > > >> if > > > >>>> we really have to add it :) > > > >>>> > > > >>>> The difference between encapsulating the store name and > > encapsulating > > > >> the > > > >>>> full state store builder is that, in the former: > > > >>>> > > > >>>> ----------- > > > >>>> > > > >>>> String storeName = "store1"; > > > >>>> builder.addStore(new MyStoreBuilder(storeName)); > > > >>>> stream1.transform(new MyTransformerSupplier(storeName)); // > > > following > > > >>>> > > > >>>> my > > > >>>> > > > >>>> proposal, that the store name can be passed in and used for both > > > >>>> `listStores` and in the `Transformer#init`; so the Transformer > > > function > > > >>>> does not need to get the constant string name again. > > > >>>> > > > >>>> // one caveat to admit, is that > > > >>>> MyTransofmerSupplier logic may be just unique to `store1` so it > > cannot > > > >> be > > > >>>> reused with a different store name anyways. > > > >>>> ----------- > > > >>>> > > > >>>> While in the latter: > > > >>>> > > > >>>> ----------- > > > >>>> > > > >>>> stream1.transform(new MyTransformerSupplierForStore1); // the > name > > > is > > > >>>> just indicating that we may have one such supplier for each store. > > > >>>> > > > >>>> ----------- > > > >>>> > > > >>>> I understand the latter introduce more convenience from the API, > but > > > the > > > >>>> cost is that since we still cannot completely `builder.addStore`, > > but > > > >>>> > > > >>>> only > > > >>>> > > > >>>> reduce its semantic scope to shared state stores only,; hence > users > > > need > > > >>>> > > > >>>> to > > > >>>> > > > >>>> learn two ways of creating state stores for those two patterns. > > > >>>> > > > >>>> My argument is that more public APIs requires longer learning > curve > > > for > > > >>>> users, and introduces more usage patterns that may confuse users > > (the > > > >>>> proposal I had tries to replace one with another completely). > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> > < > > > >> pgwha...@gmail.com> wrote: > > > >>>> > > > >>>> > > > >>>> Thanks for the great thoughts Matthias and Guozhang! > > > >>>> > > > >>>> If I'm not mistaken, Guozhang's suggestion is what my second > > > >>>> > > > >>>> alternative > > > >>>> > > > >>>> on > > > >>>> > > > >>>> the KIP is ("Have the added method on the Supplier interfaces only > > > >>>> > > > >>>> return > > > >>>> > > > >>>> store names, not builders"). I do think it would be a worthwhile > > > >>>> > > > >>>> usability > > > >>>> > > > >>>> improvement on its own, but to Matthias's point, it doesn't > achieve > > > the > > > >>>> full goal of completing encapsulating a state store and it's > > processor > > > >>>> > > > >>>> - > > > >>>> > > > >>>> it > > > >>>> > > > >>>> encapsulates the name, but not the StateStoreBuilder. > > > >>>> > > > >>>> I'm really intrigued by Matthias's idea that forgoes the default > > > >>>> > > > >>>> interface > > > >>>> > > > >>>> method I proposed. Having smaller, separate interfaces is a > > powerful > > > >>>> > > > >>>> idea > > > >>>> > > > >>>> and I think a cleaner API than what I proposed. The non-shared > > store > > > >>>> > > > >>>> use > > > >>>> > > > >>>> case is handled well here, and the shared store use case is > > possible, > > > >>>> though maybe still not as graceful as we would like (having to add > > the > > > >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too > > > >>>> > > > >>>> subtle > > > >>>> > > > >>>> to > > > >>>> > > > >>>> me). > > > >>>> > > > >>>> We're all agreed that one of the big problems with the shared > store > > > use > > > >>>> case is how to deal with adding the same store to the topology > > > multiple > > > >>>> times. Catching the "store already added" exception is risky. > > Here's > > > >>>> > > > >>>> a > > > >>>> > > > >>>> maybe radical idea: change `topology.addStateStore()` to be > > idempotent > > > >>>> > > > >>>> for > > > >>>> > > > >>>> adding a given state store name and `StoreBuilder`. In other > words, > > > >>>> `addStateStore` would not throw the "store already added" > exception > > if > > > >>>> > > > >>>> the > > > >>>> > > > >>>> `StoreBuilder` being added for a given name has the same identity > as > > > >>>> > > > >>>> the > > > >>>> > > > >>>> one that has already been added. Does this eliminate all the bugs > > > >>>> > > > >>>> we're > > > >>>> > > > >>>> worried about? Thinking about it for a few minutes, it seems to > > > >>>> > > > >>>> eliminate > > > >>>> > > > >>>> most at least (would a user really use the exact same StoreBuilder > > > when > > > >>>> they intend there to be two stores?). It might make the API > > slightly > > > >>>> harder to use if a user isn't immediately aware of that subtlety, > > but > > > a > > > >>>> good error message should ease the pain, and it would happen > > > >>>> > > > >>>> immediately > > > >>>> > > > >>>> during development. > > > >>>> > > > >>>> And with regards to Matthias's comment about whether we need to > > > >>>> > > > >>>> deprecate > > > >>>> > > > >>>> existing varargs transform methods - I don't think we need to, but > > it > > > >>>> > > > >>>> might > > > >>>> > > > >>>> be nice for there only to be one way to do things, assuming > whatever > > > we > > > >>>> come up with supports all existing use cases. I don't feel > strongly > > > >>>> > > > >>>> about > > > >>>> > > > >>>> this, but if we don't deprecate, I do think it's important to add > > > >>>> > > > >>>> checks > > > >>>> > > > >>>> that prevent users from trying to do the same thing in two > different > > > >>>> > > > >>>> ways, > > > >>>> > > > >>>> as we've discussed. > > > >>>> > > > >>>> Paul > > > >>>> > > > >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < > > > matth...@confluent.io > > > >>>> > > > >>>> wrote: > > > >>>> > > > >>>> > > > >>>> Guozhang, > > > >>>> > > > >>>> > > > >>>> Regarding the last option to catch "store exist already" exception > > > >>>> > > > >>>> and > > > >>>> > > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > > > >>>> > > > >>>> actual > > > >>>> > > > >>>> user bugs. > > > >>>> > > > >>>> I agree with this concern. From my original email: > > > >>>> > > > >>>> > > > >>>> The only disadvantage I see, might be > > > >>>> potential bugs about sharing state if two different stores are > > > >>>> > > > >>>> named > > > >>>> > > > >>>> the > > > >>>> > > > >>>> same by mistake (this would not be detected). > > > >>>> > > > >>>> For your new proposal: I am not sure if it addresses Paul's > original > > > >>>> idea -- I hope Paul can clarify. From my understanding, the idea > was > > > >>>> > > > >>>> to > > > >>>> > > > >>>> encapsulate a store and its processor. As many stores are not > > shared, > > > >>>> this seems to be quite useful. Your proposal falls a little short > to > > > >>>> support encapsulation for none-shared stores. > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> > > > >>>> > > > >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: > > > >>>> > > > >>>> Matthias, > > > >>>> > > > >>>> Thanks for your feedbacks. > > > >>>> > > > >>>> Regarding the last option to catch "store exist already" exception > > > >>>> > > > >>>> and > > > >>>> > > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding > > > >>>> > > > >>>> actual > > > >>>> > > > >>>> user bugs. > > > >>>> > > > >>>> Thinking about Paul's proposal and your suggestion again, I'd like > > > >>>> > > > >>>> to > > > >>>> > > > >>>> propose another alternative somewhere in the middle of your > > > >>>> > > > >>>> approaches, > > > >>>> > > > >>>> i.e. we still let users to create sharable state stores via > > > >>>> `addStateStore`, and we allow the TransformerSupplier to return a > > > >>>> > > > >>>> list > > > >>>> > > > >>>> of > > > >>>> > > > >>>> state stores that it needs, i.e.: > > > >>>> > > > >>>> public interface TransformerSupplier<K, V, R> { > > > >>>> Transformer<K, V, R> get(); > > > >>>> default List<String> stateStoreNames() { > > > >>>> return Collections.emptyList(); > > > >>>> < > https://cwiki.apache.org/confluence/pages/Collections.emptyList() > > > >>>> > > > >>>> ;> > > > >>>> > > > >>>> } > > > >>>> } > > > >>>> > > > >>>> by doing this users can still "consolidate" the references of > store > > > >>>> > > > >>>> names > > > >>>> > > > >>>> in a single place in the transform call, e.g.: > > > >>>> > > > >>>> public class MyTransformerSupplier<K, V, R> { > > > >>>> private String storeName; > > > >>>> > > > >>>> public class MyTransformer<K, V, R> { > > > >>>> > > > >>>> .... > > > >>>> > > > >>>> init() { > > > >>>> store = context.getStateStore(storeName); > > > >>>> } > > > >>>> } > > > >>>> > > > >>>> default List<String> stateStoreNames() { > > > >>>> return Collections.singletonList(storeName); > > > >>>> < > https://cwiki.apache.org/confluence/pages/Collections.emptyList() > > > >>>> > > > >>>> ;> > > > >>>> > > > >>>> } > > > >>>> } > > > >>>> > > > >>>> Basically, we move the parameters from the caller of `transform` > to > > > >>>> > > > >>>> inside > > > >>>> > > > >>>> the TransformSuppliers. DSL implementations would not change much, > > > >>>> > > > >>>> simply > > > >>>> > > > >>>> calling `connectStateStore` by getting the list of names from the > > > >>>> > > > >>>> provided > > > >>>> > > > >>>> function. > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> > > > >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < > > > >>>> > > > >>>> matth...@confluent.io > > > >>>> > > > >>>> wrote: > > > >>>> > > > >>>> > > > >>>> Just a meta comment: do we really need to deprecate existing > > > >>>> `transform()` etc methods? > > > >>>> > > > >>>> The last argument is a vararg, and thus, just keeping the existing > > > >>>> > > > >>>> API > > > >>>> > > > >>>> for this part seems to work too, allowing to implement both > > > >>>> > > > >>>> patterns? > > > >>>> > > > >>>> Also, instead of adding a default method, we could also add a new > > > >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder> > > > >>>> stateStores()` -- users could implement `TransformerSupplier` and > > > >>>> `StoreBuilderSupplier` at once; and for this case, we require that > > > >>>> > > > >>>> users > > > >>>> > > > >>>> don't provide store name in `transform()`. > > > >>>> > > > >>>> Similar, we could add an interface `StoreNameSupplier` with method > > > >>>> `List<String> stateStores()`. This allows to "auto-wire" a > > > >>>> > > > >>>> transformer > > > >>>> > > > >>>> to existing stores (to avoid the issue to add the same store > > > >>>> > > > >>>> multiple > > > >>>> > > > >>>> times). > > > >>>> > > > >>>> Hence, for shared stores, there would be one "main" transformer > > > >>>> > > > >>>> that > > > >>>> > > > >>>> implements `StoreBuilderSupplier` and that must be added first to > > > >>>> > > > >>>> the > > > >>>> > > > >>>> topology. The other transformers would implement > > > >>>> > > > >>>> `StoreNameSupplier` > > > >>>> > > > >>>> and > > > >>>> > > > >>>> just connect to those stores. > > > >>>> > > > >>>> Another possibility to avoid the issue of adding the same stores > > > >>>> multiple times would be, that the DSL always calls > > > >>>> > > > >>>> `addStateStore()` > > > >>>> > > > >>>> but > > > >>>> > > > >>>> catches a potential "store exists already" exception and falls > > > >>>> > > > >>>> back > > > >>>> > > > >>>> to > > > >>>> > > > >>>> `connectProcessorAndStateStore()` for this case. Thus, we would > > > >>>> > > > >>>> not > > > >>>> > > > >>>> need > > > >>>> > > > >>>> the `StoreNameSupplier` interface and the order in which > > > >>>> > > > >>>> transformers > > > >>>> > > > >>>> are added would not matter either. The only disadvantage I see, > > > >>>> > > > >>>> might > > > >>>> > > > >>>> be > > > >>>> > > > >>>> potential bugs about sharing state if two different stores are > > > >>>> > > > >>>> named > > > >>>> > > > >>>> the > > > >>>> > > > >>>> same by mistake (this would not be detected). > > > >>>> > > > >>>> > > > >>>> > > > >>>> Just some ideas I wanted to share. What do you think? > > > >>>> > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> On 12/11/18 3:46 AM, Paul Whalen wrote: > > > >>>> > > > >>>> Ah yes of course, this was an oversight, I completely ignored the > > > >>>> > > > >>>> multiple > > > >>>> > > > >>>> processors sharing the same state store when writing up the KIP. > > > >>>> > > > >>>> Which > > > >>>> > > > >>>> is > > > >>>> > > > >>>> funny, because I've actually done this (different processors > > > >>>> > > > >>>> sharing > > > >>>> > > > >>>> state > > > >>>> > > > >>>> stores) a fair amount myself, and I've settled on a pattern > > > >>>> > > > >>>> where I > > > >>>> > > > >>>> group > > > >>>> > > > >>>> the Processors in an enclosing class, and that enclosing class > > > >>>> > > > >>>> handles > > > >>>> > > > >>>> as > > > >>>> > > > >>>> much as possible. Here's a gist showing the rough structure, > > > >>>> > > > >>>> just > > > >>>> > > > >>>> for > > > >>>> > > > >>>> context: > > > >>>> > > > >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 > > > >>>> > > > >>>> . Note how it adds the stores to the topology, as well as > > > >>>> > > > >>>> providing a > > > >>>> > > > >>>> public method with the store names. > > > >>>> > > > >>>> I don't think my proposal completely conflicts with the multiple > > > >>>> > > > >>>> processors > > > >>>> > > > >>>> sharing state stores use case, since you can create a supplier > > > >>>> > > > >>>> that > > > >>>> > > > >>>> provides the store name you want, somewhat independently of your > > > >>>> > > > >>>> actual > > > >>>> > > > >>>> Processor logic. The issue I do see though, is that > > > >>>> topology.addStateStore() can only be called once for a given > > > >>>> > > > >>>> store. > > > >>>> > > > >>>> So > > > >>>> > > > >>>> for > > > >>>> > > > >>>> your example, if the there was a single TransformerSupplier that > > > >>>> > > > >>>> was > > > >>>> > > > >>>> passed > > > >>>> > > > >>>> into both transform() calls, "store1" would be added (under the > > > >>>> > > > >>>> hood) > > > >>>> > > > >>>> to > > > >>>> > > > >>>> the topology twice, which is no good. > > > >>>> > > > >>>> Perhaps this suggests that one of my alternatives on the KIP > > > >>>> > > > >>>> might > > > >>>> > > > >>>> be > > > >>>> > > > >>>> desirable: either not having the suppliers return StoreBuilders > > > >>>> > > > >>>> (just > > > >>>> > > > >>>> store > > > >>>> > > > >>>> names), or not deprecating the old methods that take "String... > > > >>>> stateStoreNames". I'll have to think about it a bit. > > > >>>> > > > >>>> Paul > > > >>>> > > > >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < > > > >>>> > > > >>>> wangg...@gmail.com> > > > >>>> > > > >>>> wrote: > > > >>>> > > > >>>> Hello Paul, > > > >>>> > > > >>>> Thanks for the great writeup (very detailed and crystal > > > >>>> > > > >>>> motivation > > > >>>> > > > >>>> sections!). > > > >>>> > > > >>>> This is quite an interesting idea and I do like the API > > > >>>> > > > >>>> cleanness > > > >>>> > > > >>>> you > > > >>>> > > > >>>> proposed. The original motivation of letting StreamsTopology to > > > >>>> > > > >>>> add > > > >>>> > > > >>>> state > > > >>>> > > > >>>> stores though, is to allow different processors to share the > > > >>>> > > > >>>> state > > > >>>> > > > >>>> store. > > > >>>> > > > >>>> For example: > > > >>>> > > > >>>> builder.addStore("store1"); > > > >>>> > > > >>>> // a path of stream transformations that leads to KStream > > > >>>> > > > >>>> stream1. > > > >>>> > > > >>>> stream1.transform(..., "store1"); > > > >>>> > > > >>>> // another path that generates a KStream stream2. > > > >>>> stream2.transform(..., "store1"); > > > >>>> > > > >>>> Behind the scene, Streams will make sure stream1 / stream2 > > > >>>> > > > >>>> transformations > > > >>>> > > > >>>> will always be grouped together as a single group of tasks, each > > > >>>> > > > >>>> of > > > >>>> > > > >>>> which > > > >>>> > > > >>>> will be executed by a single thread and hence there's no > > > >>>> > > > >>>> concurrency > > > >>>> > > > >>>> issues > > > >>>> > > > >>>> on accessing the store from different operators within the same > > > >>>> > > > >>>> task. > > > >>>> > > > >>>> I'm > > > >>>> > > > >>>> not sure how common this use case is, but I'd like to hear if > > > >>>> > > > >>>> you > > > >>>> > > > >>>> have > > > >>>> > > > >>>> any > > > >>>> > > > >>>> thoughts maintaining this since the current proposal seems > > > >>>> > > > >>>> exclude > > > >>>> > > > >>>> this > > > >>>> > > > >>>> possibility. > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> > > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> < > > > >> pgwha...@gmail.com> > > > >>>> > > > >>>> wrote: > > > >>>> > > > >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change > > > >>>> > > > >>>> that > > > >>>> > > > >>>> I > > > >>>> > > > >>>> think could greatly increase the usability of the low-level > > > >>>> > > > >>>> processor > > > >>>> > > > >>>> API. > > > >>>> > > > >>>> I have some code written but will wait to see if there is buy > > > >>>> > > > >>>> in > > > >>>> > > > >>>> before > > > >>>> > > > >>>> going all out and creating a pull request. It seems like most > > > >>>> > > > >>>> of > > > >>>> > > > >>>> the > > > >>>> > > > >>>> work > > > >>>> > > > >>>> would be in updating documentation and tests. > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > > > >>>> > > > >>>> Thanks! > > > >>>> Paul > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -- Guozhang > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -- Guozhang > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >