Thanks Paul! On 8/15/19 7:28 PM, Paul Whalen wrote: > I updated the KIP (and PR) to relax the restriction on connecting state > stores via either means; it definitely makes sense to me at this point. > I'd love to hear if there are any other concerns or broad objections to the > KIP. > > Paul > > On Thu, Aug 8, 2019 at 10:12 PM Paul Whalen <pgwha...@gmail.com> wrote: > >> Matthias, >> >> You did summarize my thinking correctly, thanks for writing it out. I >> think the disconnect on opinion is due to a couple things influenced by my >> habits while writing streams code: >> >> 1) I don't see state stores that are "individually owned" versus "shared" >> as that much different at all, at least from the perspective of the >> business logic for the Processor. So it is actually a negative to separate >> the connecting of stores, because it appears in the topology wiring that >> fewer stores are being used by the Processor than actually are. A reader >> might assume that the Processor doesn't need other state to do its job >> which could cause confusion. >> 2) In practice, my addProcessor() and addStateStore() (or >> builder.addStateStore() and stream.process() ) calls are very near each >> other anyway, so the shared dependency on StoreBuilder is not a burden; >> passing the same object could even bring clarity to the idea that the store >> is shared and not individually owned. >> >> Hearing your thoughts though, I think I have imposed a bit too much of my >> own style and assumptions on the API, especially with the shared dependency >> on a single StoreBuilder and thoughts about store ownership/sharing. I'm >> going to update the KIP since the one +1 vote comes from John who is favor >> of relaxing the restriction anyway. >> >> Paul >> >> On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> I am not sure if I full understand, hence, I try to rephrase: >>> >>>> I can't think of an example that would require both ways, or would >>>> even be more readable using both ways. >>> >>> Example: >>> >>> There are two processor A and B, and one store S that both need to >>> access and one store S_b that only B needs to access: >>> >>> If we don't allow to mix both approaches, it would be required to write >>> the following code: >>> >>> Topology t = new Topology(); >>> t.addProcessor("A", ...); // does not add any store >>> t.addProceccor("B", ...); // does not add any store >>> t.addStateStore(..., "A", "B"); // adds S and connect it to A and B >>> t.addStateStore(..., "B"); // adds S_b and connect it to B >>> >>> // DSL example: >>> >>> StreamsBiulder b = new StreamsBuilder(); >>> b.addStateStore() // adds S >>> b.addStateStore() // adds S_b >>> stream1.process(..., "S") // add A and connect S >>> stream2.process(..., "S", "S_b") // add B and connect S and S_b >>> >>> >>> If we allow to mixes both approaches, the code could be (simplified to): >>> >>> Topology t = new Topology(); >>> t.addProcessor("A", ...); // does not add any store >>> t.addProceccor("B", ...); // adds/connects S_b implicitly >>> t.addStateStore(..., "A", "B"); // adds S and connect it to A and B >>> >>> // DSL example >>> >>> StreamsBiulder b = new StreamsBuilder(); >>> b.addStateStore() // adds S >>> stream1.process(..., "S") // add A and connect S >>> stream2.process(..., "S") // add B and connect S; adds/connects S_b >>> implicitly >>> >>> The fact that B has a "private store" could be encapsulated and I don't >>> see why this would be bad? >>> >>>> If you can >>>> do both ways, the actual full set of state stores being connected could >>> be >>>> in wildly different places in the code, which could create confusion. >>> >>> Ie, I don't see why the second version would be confusing, or why the >>> first version would be more readable (I don't argue it's less readable >>> either though; I think both are equally readable)? >>> >>> >>> >>> Or do you argue that we should allow the following: >>> >>>> Shared stores can be passed from >>>> the outside in an anonymous ProcessorSupplier if desired, making it >>>> effectively the same as passing the stateStoreNames var args >>> >>> Topology t = new Topology(); >>> t.addProcessor("A", ...); // adds/connects S implicitly >>> t.addProceccor("B", ...); // adds/connects S and S_b implicitly >>> >>> // DSL example >>> >>> StreamsBiulder b = new StreamsBuilder(); >>> stream1.process(...) // add A and add/connect S implicitly >>> stream2.process(...) // add B and add/connect S and S_b implicitly >>> >>> For this case, the second implicit adding of S would require to return >>> the same `StoreBuilder` instance to make it idempotent what seems hard >>> to achieve, because both `ProcessorSuppliers` now have a cross >>> dependency to us the same object. >>> >>> Hence, I don't think this would be a good approach. >>> >>> >>> Also, because we require for a unique store name to always pass the same >>> `StoreBuilder` instance, we have actually a good protection against user >>> bug that may add two stores with the same name but different builders >>> twice. >>> >>> >>> I also do not feel super strong about it, but see some advantages to >>> allow the mixed approach, and don't see disadvantages. Would be good to >>> get input from others, too. >>> >>> >>> >>> -Matthias >>> >>> >>> On 8/7/19 7:29 PM, Paul Whalen wrote: >>>> My thinking on restricting the API to enforce only one way of connecting >>>> stores would make it more simple to use and end up with more readable >>>> code. I can't think of an example that would require both ways, or >>> would >>>> even be more readable using both ways. Shared stores can be passed from >>>> the outside in an anonymous ProcessorSupplier if desired, making it >>>> effectively the same as passing the stateStoreNames var args. If you >>> can >>>> do both ways, the actual full set of state stores being connected could >>> be >>>> in wildly different places in the code, which could create confusion. I >>>> personally can't imagine a case in which that would be useful. >>>> >>>> All that being said, I don't feel terribly strongly about it. I'm just >>>> trying to make the API as straightforward as possible. Admittedly a >>>> runtime check doesn't make for a great API, but I see it more as an >>>> opportunity to educate the user to make it clear that "connecting" a >>> state >>>> store is a thing that can be done in two different ways, but there is no >>>> reason to mix both. If it seems like there's a compelling reason to mix >>>> them then I would abandon the idea in a heartbeat. >>>> >>>> Paul >>>> >>>> On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Sorry for the long silence on this KIP Paul! I guess the 2.3 release >>>>> distracted us somewhat. >>>>> >>>>> Overall, I am +1. >>>>> >>>>> With regard to John's point about owned vs shared state stores, I think >>>>> it describe a valid use case, and throwing an exception if people want >>>>> to mix both features might be too restrictive? >>>>> >>>>> We could of course later relax the restriction, but atm I am not sure >>>>> what the main argument for adding the restriction is? >>>>> >>>>> (a) In the current API, one could connect the same store multiple times >>>>> to the same processor without getting an exception, because the >>>>> operation is idempotent. >>>>> >>>>> (b) The KIP also suggest to relax the current restriction to add the >>>>> same store twice, as long as store name and `StoreBuilder` instance are >>>>> the same, because it's an idempotent (hence, safe) operation too. >>>>> >>>>> Because we have already (a) and (b) and consider both as safe, it seems >>>>> we could also treat the case of mixing both patterns as idempotent and >>>>> hence safe. And if we do this, we enable to mix both patterns for >>>>> different stores implicitly. >>>>> >>>>> >>>>> Thoughts? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 6/17/19 2:31 PM, John Roesler wrote: >>>>>> Hey, all, >>>>>> >>>>>> Sorry I'm late to the party. I meant to read into this KIP before, but >>>>>> didn't get around to it. I was just reminded when Paul mentioned it in >>>>>> a different thread. Please feel free to bump a discussion any time it >>>>>> stalls! >>>>>> >>>>>> I've just read through the whole discussion so far, and, to echo the >>>>>> earlier sentiments, the motivation seems very clear. I remember how >>>>>> hard it was to figure out how to actually wire up a stateful processor >>>>>> properly the first couple of times. Not a very good user experience. >>>>>> >>>>>> I looked over the whole conversation to date, as well as the KIP and >>>>>> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The >>>>>> current approach looks good to me. I was concerned about the "cheat >>>>>> codes"-style mixin interface. Discoverability would have been a >>>>>> problem, and it's also not a very normal pattern for Java APIs. It >>>>>> actually looks a little more like something you'd do with an >>>>>> annotation. >>>>>> >>>>>> So the current approach seems good: >>>>>> * The new interface with a default to return `null` is effectively >>>>>> shipping the feature flagged "off" (which is nice and safe) >>>>>> * Shared stores are "supported" the same way they always have been, by >>>>>> connecting them externally. This makes sense, since those stores >>>>>> aren't "owned" by any of the connected processors. >>>>>> * Processors that do own their stores can configure them in the same >>>>>> file they use them, which decreases the probability of cast exceptions >>>>>> when they get the stores from the context. >>>>>> * Stateful processors that own their stores are available for one-shot >>>>>> definition of the stores and the processor all in the same file (this >>>>>> is the main point of the KIP) >>>>>> >>>>>> The runtime check that stores can't be both defined in the processor >>>>>> and referenced by name might be a little restrictive (since we already >>>>>> have the restriction that same-name stores can't be registered), but >>>>>> it would also be easy to remove it later. I'm just thinking that if I >>>>>> have a processor that owns one store and shares another, it would be >>>>>> pretty obvious how to hook it up in the proposed API, except for that >>>>>> check. >>>>>> >>>>>> One last thought, regarding the all-important interface name: If you >>>>>> wanted to indicate more that the stores are available for Streams to >>>>>> connect, rather than that they are already connected, you could call >>>>>> it ConnectableStoreProvider (similar to AutoCloseable). >>>>>> >>>>>> I just thought I'd summarize the current state, since it's been a >>>>>> while and no one has voted yet. I'll go ahead and vote now on the >>>>>> voting thread, since I'm +1 on the current proposal. >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> On Mon, May 27, 2019 at 1:59 PM Paul Whalen <pgwha...@gmail.com> >>> wrote: >>>>>>> >>>>>>> It wasn't much of a lift changing option B to work for option C, so I >>>>>>> closed that PR and made a new one, which should be identical to the >>> KIP >>>>>>> right now: https://github.com/apache/kafka/pull/6824. There are a >>> few >>>>>>> todos still which I will hold off until the KIP is accepted. >>>>>>> >>>>>>> I created a voting thread about a month ago, so I'll bump that now >>> that >>>>>>> we're nearly there. >>>>>>> >>>>>>> Paul >>>>>>> >>>>>>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com> >>> wrote: >>>>>>> >>>>>>>> Per Matthias's suggestion from a while ago, I actually implemented a >>>>> good >>>>>>>> amount of option B to get a sense of the user experience and >>>>> documentation >>>>>>>> requirements. For a few reasons mentioned below, I think it's not >>> my >>>>>>>> favorite option, and I prefer option C. But since I did the work >>> and >>>>> it >>>>>>>> can help discussion, I may as well share: >>>>>>>> https://github.com/apache/kafka/pull/6821. >>>>>>>> >>>>>>>> Things I learned along the way implementing Option B: >>>>>>>> - For the name of the interface, I like ConnectedStoreProvider. It >>>>> isn't >>>>>>>> perfect but it seems to capture the general gist without being >>> overly >>>>>>>> verbose. I get that from a strict standpoint it's not "providing >>>>> connected >>>>>>>> stores" but is instead "providing stores to be connected," but I >>> think >>>>> that >>>>>>>> in context and with documentation, the risk of someone being >>> confused >>>>> by >>>>>>>> that is low. >>>>>>>> - I definitely felt the discoverability issue while trying to write >>>>> clear >>>>>>>> documentation; you really have to make sure to connect the dots for >>> the >>>>>>>> user when the interface isn't connected to anything. >>>>>>>> - Another problem with a separate interface found while writing >>>>>>>> tests/examples: defining a ProcessorSupplier that also implements >>>>>>>> ConnectedStoreProvider cannot be done anonymously, since you can't >>>>> define >>>>>>>> an anonymous class in Java that implements multiple interfaces. I >>>>> actually >>>>>>>> consider this a fairly major usability issue - it means a user >>> always >>>>> has >>>>>>>> to have a custom class rather than doing it inline. We could >>> provide >>>>> an >>>>>>>> abstract class that implements the two, but at that point, we're not >>>>> that >>>>>>>> far from option A or C anyway. >>>>>>>> >>>>>>>> I updated the KIP with my current thinking, which as mentioned is >>>>>>>> Matthias's option C. Once again for clarity, that *is not* what is >>> in >>>>> the >>>>>>>> linked pull request. The current KIP is my proposal. >>>>>>>> >>>>>>>> Thanks everyone for the input! >>>>>>>> >>>>>>>> P.S. What do folks use to edit the HTML documentation, e.g. >>>>>>>> processor-api.html? I looked at doing it by hand it but it kind of >>>>> looked >>>>>>>> like agony with all the small tags required for formatting code, so >>> I'm >>>>>>>> sort of assuming there's tooling for it. >>>>>>>> >>>>>>>> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax < >>>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think the discussion mixed approaches a little bit, hence, let me >>>>>>>>> rephrase my understanding: >>>>>>>>> >>>>>>>>> >>>>>>>>> A) add new method with default implementation to >>> `ProcessorSupplier`: >>>>>>>>> >>>>>>>>> For this case, we don't add a new interface, but only add a new >>> method >>>>>>>>> to `ProcessorSupplier` -- to keep backward compatibility, we need >>> to >>>>> add >>>>>>>>> a default implementation. Users opt into the new feature by >>>>> overwriting >>>>>>>>> the default implementation. >>>>>>>>> >>>>>>>>> >>>>>>>>> B) We add a new interface with new method: >>>>>>>>> >>>>>>>>> For this case, `ProcessorSupplier` interface is not changed and it >>>>> does >>>>>>>>> also _not_ extend the new interface. Because `ProcessorSupplier` is >>>>> not >>>>>>>>> changed, it's naturally backward compatible. Users opt into the new >>>>>>>>> feature, by adding the new interface to their ProcessorSupplier >>>>>>>>> implementation and they need to implement the new method because >>> there >>>>>>>>> is no default implementation. Kafka Streams can use `instanceof` to >>>>>>>>> detect if the new interface is used or not and thus, to the right >>>>> thing. >>>>>>>>> >>>>>>>>> >>>>>>>>> What was also discussed is a mix of both: >>>>>>>>> >>>>>>>>> C) We add a new interface with new method and let >>> `ProcessorSupplier` >>>>>>>>> extend the new interface: >>>>>>>>> >>>>>>>>> Here, we need to add a default implementation to preserve backward >>>>>>>>> compatibility. Similar to (A), users opt into the feature by >>>>> overwriting >>>>>>>>> the default implementation. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Option (C) is the same as (A) from a user point of view because a >>> user >>>>>>>>> won't care about the new interface. It only makes a difference for >>> our >>>>>>>>> code base, as we can share the default implementation of the new >>>>> method >>>>>>>>> This is only a small gain, as the implementation is trivial but >>> also a >>>>>>>>> small drawback as we add new public interface that is useless to >>> the >>>>>>>>> user because the user would never implement the interface directly. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> For (A/C), it might be simpler for users to detect the feature. For >>>>> (B), >>>>>>>>> we have the advantage that users must implement the method if they >>> use >>>>>>>>> the new interface. >>>>>>>>> >>>>>>>>> Overall, it seems that (A) might be the best choice because it >>> makes >>>>> the >>>>>>>>> feature easier discoverable and does not add a "useless" >>> interface. If >>>>>>>>> you want to go with (C) to share the default implementation code, >>>>> that's >>>>>>>>> also fine with me. I am convinced now (even if I brought it up), >>> that >>>>>>>>> (B) might be not optimal because feature discoverability seems to >>> be >>>>>>>>> important. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> About `null` vs `emptyList`: I still tend to like `null` better but >>>>> it's >>>>>>>>> really a detail and not too important. Note, that the question only >>>>>>>>> arises for (A/C), but not for (B) because for (B) we don't need a >>>>>>>>> default implementation. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> @Paul: It's unclear to me atm what your final proposal is because >>> you >>>>>>>>> mentioned that you might want to rename `StateStoreConnector`? It's >>>>> also >>>>>>>>> unclear to me atm, if you prefer (A), (B), or (C). >>>>>>>>> >>>>>>>>> Maybe you can update the KIP if necessary and clearly state what >>> you >>>>>>>>> final proposal is. Beside this, it seems we can move to a VOTE? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 5/2/19 3:01 PM, Bruno Cadonna wrote: >>>>>>>>>> Hi Paul, >>>>>>>>>> >>>>>>>>>> I will try to express myself a bit clearer. >>>>>>>>>> >>>>>>>>>> Ad 1) >>>>>>>>>> My assumption is that if `StateStoreConnector#stateStores()` >>> returns >>>>>>>>> `null` >>>>>>>>>> Kafka Streams will throw an NPE because on purpose no null check >>> is >>>>>>>>>> performed before the loop that calls >>>>> `StreamsBuilder#addStateStore()`. >>>>>>>>> When >>>>>>>>>> the user finally understands the cause of the NPE, she knows that >>> she >>>>>>>>> has >>>>>>>>>> to override `StateStoreConnector#stateStores()` in her >>>>> implementation. >>>>>>>>> My >>>>>>>>>> question was, why let the user discover that she has to overwrite >>> the >>>>>>>>>> method at runtime if you could not provide a default >>> implementation >>>>> for >>>>>>>>>> `StateStoreConnector#stateStores()` and let the compiler tell the >>>>> user >>>>>>>>> the >>>>>>>>>> need to overwrite the method. Not providing a default >>> implementation >>>>>>>>>> without separating the interfaces implies not being >>>>> backward-compatible. >>>>>>>>>> That means, if we choose to not provide a default implementation >>> and >>>>> let >>>>>>>>>> the compiler signal the necessity to override the method, we have >>> to >>>>>>>>>> separate the interfaces in any case. >>>>>>>>>> >>>>>>>>>> Ad 2) >>>>>>>>>> If you check for `null` or empty list in `process` and do not call >>>>>>>>>> `addStateStores` in those cases, the advantage of returning `null` >>>>> to be >>>>>>>>>> saver to detect bugs as mentioned by Matthias would be lost. But >>>>> maybe >>>>>>>>> I am >>>>>>>>>> missing something here. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Bruno >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> >>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I definitely don't mind anyone jumping, Bruno, thanks for the >>>>> comments! >>>>>>>>>>> >>>>>>>>>>> 1) I'm not totally sure I'm clear on your point, but I think >>> we're >>>>> on >>>>>>>>> the >>>>>>>>>>> same page - if we're adding a method to the XSupplier interfaces >>> (by >>>>>>>>> making >>>>>>>>>>> them inherit from a super interface StateStoreConnector) then we >>>>>>>>> definitely >>>>>>>>>>> need a default implementation to maintain compatibility. Whether >>>>> the >>>>>>>>>>> default implementation returns null or an empty list is somewhat >>> of >>>>> a >>>>>>>>>>> detail. >>>>>>>>>>> >>>>>>>>>>> 2) If stream.process() sees that >>> StateStoreConnector#stateStores() >>>>>>>>> returns >>>>>>>>>>> either null or an empty list, it would handle that case >>> specifically >>>>>>>>> and >>>>>>>>>>> not try to call addStateStore at all. Or is this not what you're >>>>>>>>> asking? >>>>>>>>>>> >>>>>>>>>>> Separately, I'm still hacking away at the details of the PR and >>> will >>>>>>>>>>> continue to get something into a discussable state, but I'll >>> share >>>>> some >>>>>>>>>>> thoughts I've run into. >>>>>>>>>>> >>>>>>>>>>> A) I'm tentatively going the separate interface route (Matthias's >>>>>>>>>>> suggestion) and naming it ConnectedStoreProvider. Still don't >>> love >>>>> the >>>>>>>>>>> name, but there's something nice about the name indicating *why* >>>>> this >>>>>>>>> thing >>>>>>>>>>> is providing the store, not just that it is providing it. >>>>>>>>>>> >>>>>>>>>>> B) It has occurred to me that topology.addProcessor() could also >>>>>>>>> recognize >>>>>>>>>>> if ProcessorSupplier implements ConnectedStoreProvider and add >>> and >>>>>>>>> connect >>>>>>>>>>> stores appropriately. This isn't in the KIP and I think the >>>>> value-add >>>>>>>>> is >>>>>>>>>>> lower (if you're reaching that low level, surely the "auto >>>>> add/connect >>>>>>>>>>> store" isn't too important to you), but I think it would be a >>>>>>>>> confusing if >>>>>>>>>>> it didn't, and I don't see any real downside. >>>>>>>>>>> >>>>>>>>>>> Paul >>>>>>>>>>> >>>>>>>>>>> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna < >>> br...@confluent.io> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> @Paul: Thank you for the KIP! >>>>>>>>>>>> >>>>>>>>>>>> I hope you do not mind that I jump in. >>>>>>>>>>>> >>>>>>>>>>>> I have the following comments: >>>>>>>>>>>> >>>>>>>>>>>> 1) `null` vs empty list in the default implementation >>>>>>>>>>>> IIUC, returning `null` in the default implementation should >>>>> basically >>>>>>>>>>>> signal that the method `stateStores` was not overridden. Why >>> then >>>>>>>>>>> provide a >>>>>>>>>>>> default implementation in the first place? Without default >>>>>>>>> implementation >>>>>>>>>>>> you would discover the missing implementation already at >>>>> compile-time >>>>>>>>> and >>>>>>>>>>>> not only at runtime. If you decide not to provide a default >>>>>>>>>>> implementation, >>>>>>>>>>>> `XSupplier extends StateStoreConnector` would break existing >>> code >>>>> as >>>>>>>>>>>> Matthias has already pointed out. >>>>>>>>>>>> >>>>>>>>>>>> 2) `process` method adding the StoreBuilders to the topology >>>>>>>>>>>> If the default implementation returned `null` and `XSupplier >>>>> extends >>>>>>>>>>>> StateStoreConnector`, then existing code would break, because >>>>>>>>>>>> `StreamsBuilder#addStateStore()` would throw a NPE. >>>>>>>>>>>> >>>>>>>>>>>> +1 for opening a WIP PR >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Bruno >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax < >>>>>>>>> matth...@confluent.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thank Paul! >>>>>>>>>>>>> >>>>>>>>>>>>> I agree with all of that. If we think that the general design >>> is >>>>>>>>> good, >>>>>>>>>>>>> refactoring a PR if we want to pick a different name should >>> not be >>>>>>>>> too >>>>>>>>>>>>> much additional work (hopefully). Thus, if you want to open a >>> WIP >>>>> PR >>>>>>>>>>> and >>>>>>>>>>>>> we use it to nail the open details, it might help to find a >>> good >>>>>>>>>>>>> conclusion. >>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) Default method vs new interface: >>>>>>>>>>>>> >>>>>>>>>>>>> This seems to be the hardest tradeoff. I see the point about >>>>>>>>>>>>> discoveability... Might be good to get input from others, which >>>>>>>>> version >>>>>>>>>>>>> they would prefer. >>>>>>>>>>>>> >>>>>>>>>>>>> Just to make clear, my suggestion from the last email was, that >>>>>>>>>>>>> `Transformer` etc does not extend the new interface. Instead, a >>>>> user >>>>>>>>>>>>> that want to use this feature would need to implement both >>>>>>>>> interfaces. >>>>>>>>>>>>> >>>>>>>>>>>>> If `Transformer extends StoreProvider` (just picking a name >>> here) >>>>>>>>>>>>> without default implementation existing code would break and >>> thus >>>>> it >>>>>>>>>>> not >>>>>>>>>>>>> a an option because of breaking backward compatibility. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 4/28/19 8:37 PM, Paul Whalen wrote: >>>>>>>>>>>>>> Great thoughts Matthias, thanks! I think we're all agreed that >>>>>>>>> naming >>>>>>>>>>>> and >>>>>>>>>>>>>> documentation/education are the biggest hurdles for this KIP, >>>>> and in >>>>>>>>>>>>> light >>>>>>>>>>>>>> of that, I think it makes sense for me to just take a stab at >>> a >>>>> full >>>>>>>>>>>>>> fledged PR with documentation to convince us that it's >>> possible >>>>> to >>>>>>>>> do >>>>>>>>>>>> it >>>>>>>>>>>>>> with enough clarity. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In response to your specific thoughts: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) StateStoreConnector as a name: Really good point about >>>>> defining >>>>>>>>>>> the >>>>>>>>>>>>>> difference between "adding" and "connecting." Guozhang >>> suggested >>>>>>>>>>>>>> StateStoreConnector which was definitely an improvement over >>> my >>>>>>>>>>>>>> StateStoresSupplier, but I think you're right that we need to >>> be >>>>>>>>>>>> careful >>>>>>>>>>>>> to >>>>>>>>>>>>>> make it clear that it's really accomplishing both. Thinking >>>>> about >>>>>>>>> it >>>>>>>>>>>>> now, >>>>>>>>>>>>>> one problem with Connector is that the implementer of the >>>>> interface >>>>>>>>>>> is >>>>>>>>>>>>> not >>>>>>>>>>>>>> really doing any connecting, it's providing/supplying the >>> store >>>>> that >>>>>>>>>>>> will >>>>>>>>>>>>>> be both added and connected. StoreProvider seems reasonable >>> to >>>>> me >>>>>>>>>>> and >>>>>>>>>>>>>> probably the best candidate at the moment, but it would be >>> nice >>>>> if >>>>>>>>>>> the >>>>>>>>>>>>> name >>>>>>>>>>>>>> could convey that it's providing the store specifically so the >>>>>>>>> caller >>>>>>>>>>>> can >>>>>>>>>>>>>> add it to the topology and connect it to the associated >>>>> transformer. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In general I think that really calling out what "adding" >>> versus >>>>>>>>>>>>>> "connecting" is in the documentation will help make the entire >>>>>>>>>>> purpose >>>>>>>>>>>> of >>>>>>>>>>>>>> this feature more clear to the user. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2) Default method vs new interface: The choice of a default >>>>> method >>>>>>>>>>> was >>>>>>>>>>>>>> influenced by Guozhang's fear about API >>> bloat/discoverability. I >>>>>>>>> can >>>>>>>>>>>>>> definitely see it both ways Would the separate interface be >>> a >>>>>>>>>>>>>> sub-interface of Processor/TransformerSupplier or >>> standalone? It >>>>>>>>>>> seems >>>>>>>>>>>>>> like you're suggesting standalone and I think that's what I >>>>> favor. >>>>>>>>>>> My >>>>>>>>>>>>> only >>>>>>>>>>>>>> concern there is that the interface wouldn't actually be a >>> type >>>>> to >>>>>>>>>>> any >>>>>>>>>>>>>> public API which sort of hurts discoverability. You would >>> have >>>>> to >>>>>>>>>>> read >>>>>>>>>>>>> the >>>>>>>>>>>>>> javadocs for stream.process/transform() to discover that >>>>>>>>> implementing >>>>>>>>>>>> the >>>>>>>>>>>>>> interface in addition to Processor/TransformerSupplier would >>> add >>>>> and >>>>>>>>>>>>>> connect the store for you. But that added burden actually >>>>> probably >>>>>>>>>>>> helps >>>>>>>>>>>>>> us in terms of making sure people don't mix and match, like >>> you >>>>>>>>> said. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3) Returning null instead of empty: Seems fair to me. I >>> always >>>>>>>>> worry >>>>>>>>>>>>> about >>>>>>>>>>>>>> returning null when an empty collection can be used instead, >>> but >>>>>>>>>>> given >>>>>>>>>>>>> that >>>>>>>>>>>>>> the library is the caller rather than the client I think your >>>>> point >>>>>>>>>>>> makes >>>>>>>>>>>>>> sense here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 4) Returning Set instead of Collection: Agreed, don't see why >>>>> not to >>>>>>>>>>>> make >>>>>>>>>>>>>> it more specific. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax < >>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, sorry for the long pause. Just trying to catch up here. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think it save to allow `addStateStore()` to be idempotent >>> for >>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded" >>> and >>>>> thus >>>>>>>>>>>> it's >>>>>>>>>>>>>>> not really possible to use the same `StoreBuilder` object to >>>>> create >>>>>>>>>>>>>>> different stores. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also agree with the concern, that only allowing a single >>> store >>>>>>>>> (as >>>>>>>>>>>>>>> proposed by Ivan) might be too restrictive. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Overall, the current KIP version LGTM. I don't have mayor >>>>> concerns >>>>>>>>>>>> about >>>>>>>>>>>>>>> user education for this case, but I agree that we need to >>>>> document >>>>>>>>>>>> this >>>>>>>>>>>>>>> clearly. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Some further comments: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (1) I am not sure if `StateStoreConnector` is the best name >>> for >>>>> the >>>>>>>>>>>> new >>>>>>>>>>>>>>> interface. Note, that there are two concepts about stores: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - adding a store: this makes the store available in the >>>>> topology >>>>>>>>> in >>>>>>>>>>>>>>> general (however, the store is still "dangling", and not >>> used) >>>>>>>>>>>>>>> - connecting a store: this allows a processor etc to use a >>>>> store >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The new interface does both, but its name only indicates that >>>>>>>>> second >>>>>>>>>>>>>>> part what might be confusing. It might be especially >>> confusing >>>>>>>>>>> because >>>>>>>>>>>>>>> we want to disallow to mix the exiting "manually add and >>>>> connect" >>>>>>>>>>>>>>> pattern, with a new pattern to "auto add+connect". If the new >>>>>>>>>>>> interface >>>>>>>>>>>>>>> name indicates the connect part only, user might think they >>>>> need to >>>>>>>>>>>> add >>>>>>>>>>>>>>> stores manually and can connect automatically. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Unfortunately, I don't have a much better suggestion for a >>> name >>>>>>>>>>>> either. >>>>>>>>>>>>>>> The only idea that came to my mind was `StoreProvider`: to >>> me, a >>>>>>>>>>>>>>> provider is a "service" interface that does work for us, ie, >>> it >>>>>>>>> adds >>>>>>>>>>>> and >>>>>>>>>>>>>>> connects a store. Not sure if this is too subtle, if we >>> consider >>>>>>>>>>> that >>>>>>>>>>>>>>> there is already the `StoreSupplier` interface? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But maybe somebody else might still have a good idea on how >>> the >>>>>>>>>>>> improve >>>>>>>>>>>>>>> the name. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In any case, I would suggest to shorten the name to >>>>>>>>> `StoreConnector` >>>>>>>>>>>>>>> instead of `StateStoreConnector`, because we also have >>>>>>>>>>> `StoreSupplier` >>>>>>>>>>>>>>> and `StoreBuilder`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (2) The KIP proposes to add the new interface to >>>>>>>>> `ProcessorSupplier` >>>>>>>>>>>> etc >>>>>>>>>>>>>>> and to add a default implementation for the new method. >>> Hence, >>>>> user >>>>>>>>>>>>>>> would need to overwrite this default implementation to op-in >>> to >>>>> the >>>>>>>>>>>>>>> feature. I am wonder if it might be better to not add the new >>>>>>>>>>>> interface >>>>>>>>>>>>>>> to `ProcessorSupplier` etc and to just provide a new >>> interface >>>>> with >>>>>>>>>>> no >>>>>>>>>>>>>>> default implementation. Users would opt-in by adding the >>>>> interface >>>>>>>>>>>>>>> explicitly to their existing `ProcessorSupplier` >>> implementation. >>>>>>>>>>>>>>> Overwriting a default method and getting different behavior >>>>> seems >>>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>>>>> a little subtle to me, especially, because we don't want to >>>>> allow >>>>>>>>> to >>>>>>>>>>>>>>> mix-and-match the old and new approaches. Think: I only >>>>> overwrite a >>>>>>>>>>>>>>> default method and my code breaks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (3) If we keep the current default implementation for the new >>>>>>>>>>> method, >>>>>>>>>>>> I >>>>>>>>>>>>>>> am wondering if it should return `null` instead of an empty >>>>>>>>>>>> collection? >>>>>>>>>>>>>>> This might be saver to detect bugs in user code for which, >>> per >>>>>>>>>>>> accident, >>>>>>>>>>>>>>> an empty collection could be returned. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (4) Should the new method return a `Set` instead of a >>>>> `Collection` >>>>>>>>>>> to >>>>>>>>>>>>>>> indicate the semantics clearly (ie, returning the same >>>>>>>>>>> `StoreBuilder` >>>>>>>>>>>>>>> multiple times is idempotent and one cannot add+connect to it >>>>>>>>>>> twice). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote: >>>>>>>>>>>>>>>> Ivan and Guozhang, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the thoughts! Ivan's use case is definitely >>>>>>>>>>> interesting. >>>>>>>>>>>>> The >>>>>>>>>>>>>>>> way I see it, if we can achieve the main goal of the KIP >>>>> (allowing >>>>>>>>>>>>>>>> Processor/TransformerSuppliers to encapsulate their usage of >>>>> state >>>>>>>>>>>>>>> stores), >>>>>>>>>>>>>>>> we will enable this kind of thing in "user space" very >>> easily. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I will say that I'm not totally sure that most use cases of >>>>>>>>>>>> transform() >>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>> just one state store. It's hard to know since I haven't >>> seen >>>>> many >>>>>>>>>>>>>>> examples >>>>>>>>>>>>>>>> in public, but my team's usages almost exclusively require >>>>>>>>> multiple >>>>>>>>>>>>> state >>>>>>>>>>>>>>>> stores. We only reach for the low level processor API when >>> we >>>>>>>>> need >>>>>>>>>>>>> that >>>>>>>>>>>>>>>> complexity, and it's somewhat hard to imagine many use cases >>>>> that >>>>>>>>>>>> only >>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>> one state store, since the high level DSL can usually >>>>> accomplish >>>>>>>>>>>> those >>>>>>>>>>>>>>>> tasks. The example Ivan presented for instance looks like a >>>>>>>>>>>>>>>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious >>>>> what >>>>>>>>>>>> sort >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> other usages you're imagining. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That being said, perhaps the Processor API should really >>> just >>>>> be >>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>> a separate paradigm in Streams, not just a lower level that >>> we >>>>>>>>>>> reach >>>>>>>>>>>> to >>>>>>>>>>>>>>>> when necessary. In which case it would be beneficial to >>> make >>>>> the >>>>>>>>>>>>> simple >>>>>>>>>>>>>>>> use cases easier. I've definitely talked about this with my >>>>> own >>>>>>>>>>>> team - >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>> you're less familiar with the kind of functional style that >>> the >>>>>>>>>>> high >>>>>>>>>>>>>>> level >>>>>>>>>>>>>>>> DSL offers, it might be easier to "see" your state and >>> interact >>>>>>>>>>> with >>>>>>>>>>>> it >>>>>>>>>>>>>>>> directly. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Anyway, I've updated the KIP to reflect my current PR with >>>>>>>>>>> Guozhang's >>>>>>>>>>>>>>>> suggestions. It seems like there is at least some interest >>> in >>>>>>>>> that >>>>>>>>>>>> on >>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>> own and not a ton of pushback, so I think I will try to >>> start a >>>>>>>>>>> vote. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev < >>>>>>>>>>> iponoma...@mail.ru> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi all! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I was about to write another KIP, but found out that >>> KIP-401 >>>>>>>>>>>> addresses >>>>>>>>>>>>>>>>> exactly the problem I faced. So let me jump into your >>>>> discussion >>>>>>>>>>> and >>>>>>>>>>>>> ask >>>>>>>>>>>>>>>>> you to assess another idea. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I fully agree with the KIP-401's motivation part. E. g in >>> my >>>>>>>>>>>> project I >>>>>>>>>>>>>>> had >>>>>>>>>>>>>>>>> to invent a wrapper class that hides the details of >>>>> KeyValueStore >>>>>>>>>>>>>>>>> management from business logic. Of course this should be >>> done >>>>>>>>>>> better >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> KStreams API. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> But I was about to look at this problem from another side >>> and >>>>>>>>>>>> propose >>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> simple alternative in high-level DSL, that will not fit all >>>>> the >>>>>>>>>>>> cases, >>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>> most of them. Hence my idea does not exclude the Paul's >>>>> proposal. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What if we restrict ourselves to *only one* KeyValueStore >>> and >>>>>>>>>>>> propose >>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> method that resembles `aggregate` and `reduce` methods, >>> like >>>>>>>>>>> this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>>>> .map(...) >>>>>>>>>>>>>>>>> .filter(...) >>>>>>>>>>>>>>>>> .transform ((k, v, s)->{....}, Transformed.with(....)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>> * k, v -- input key & value >>>>>>>>>>>>>>>>> * s -- a KeyValueStore provided as an argument >>>>>>>>>>>>>>>>> * return value of the lambda should be KeyValue.pair(...) >>>>>>>>>>>>>>>>> * Transformed.with... is a builder, used in order to define >>>>> the >>>>>>>>>>>>>>>>> Transformer and KeyValueStore building parameters. Some of >>>>> these >>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>>>> should be: >>>>>>>>>>>>>>>>> ** store's KeySerde, >>>>>>>>>>>>>>>>> ** store's ValueSerde, >>>>>>>>>>>>>>>>> ** whether the store is persistent or in-memory, >>>>>>>>>>>>>>>>> ** store's name -- optional parameter, the system should be >>>>> able >>>>>>>>>>> to >>>>>>>>>>>>>>> devise >>>>>>>>>>>>>>>>> the name of the store transparently for the user, if we >>> don't >>>>>>>>> want >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> devise it ourselves/share the store between processors. >>>>>>>>>>>>>>>>> ** scheduled punctuation. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Imagine we have a KStream<String, Integer>, and we need to >>>>>>>>>>>> calculate a >>>>>>>>>>>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the >>>>>>>>> provided >>>>>>>>>>>>>>> integer >>>>>>>>>>>>>>>>> values. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This could be achieved as simple as >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream.transform((key, value, stateStore) -> { >>>>>>>>>>>>>>>>> int previousValue = >>>>>>>>>>>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0); >>>>>>>>>>>>>>>>> stateStore.put(key, value); >>>>>>>>>>>>>>>>> return KeyValue.pair(key, value - previousValue); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> //we do not need to bother with store name, >>>>> punctuation >>>>>>>>>>> etc. >>>>>>>>>>>>>>>>> //may be even Serde part can be omitted, since we >>> can >>>>>>>>>>>> inherit >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> serdes from stream by default >>>>>>>>>>>>>>>>> , Transformed.with(Serdes.String(), >>> Serdes.Integer()) >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The hard part of it is that new `transform` method >>> definition >>>>>>>>>>> should >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> parameterized by six type parameters: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> * input/output/KeyValueStore key type, >>>>>>>>>>>>>>>>> * input/output/KeyValueStore value type. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> However, it seems that all these types can be inferred from >>>>> the >>>>>>>>>>>>> provided >>>>>>>>>>>>>>>>> lambda and Transformed.with instances. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What do you think about this? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the uploaded PR and the detailed description! >>> I've >>>>>>>>>>> made a >>>>>>>>>>>>>>> pass >>>>>>>>>>>>>>>>> on it and left some comments. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Overall I think I agree with you that passing in the >>>>> storebuilder >>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>> that store name is more convienent as it does not require >>>>> another >>>>>>>>>>>>>>>>> `addStore` call, but we just need to spend some more >>>>>>>>> documentation >>>>>>>>>>>>>>> effort >>>>>>>>>>>>>>>>> on educating users about the two ways of connecting their >>>>> stores. >>>>>>>>>>>> I'm >>>>>>>>>>>>>>>>> slightly concerned about this education curve but I can be >>>>>>>>>>> convinced >>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> most people felt it is worthy. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen < >>>>> pgwha...@gmail.com> >>>>>>>>>>> < >>>>>>>>>>>>>>> pgwha...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'd like to resurrect this discussion with a cursory, >>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>> implementation of the KIP which combines many of our ideas: >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/6496. I tried to keep >>> the >>>>>>>>>>> diff >>>>>>>>>>>> as >>>>>>>>>>>>>>>>> small as possible for now, just using it to convey the main >>>>>>>>> ideas. >>>>>>>>>>>>> But >>>>>>>>>>>>>>>>> I'll separately address some of our earlier discussion: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - Will there be a new, separate interface for users to >>>>>>>>>>> implement >>>>>>>>>>>>> for >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> new functionality? No, to hopefully keep things simple, >>>>> all of >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> Processor/TransformerSupplier interfaces will just >>> extend >>>>>>>>>>>>>>>>> StateStoresSupplier, allowing users to opt in to this >>>>>>>>>>>> functionality >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>> overriding the default implementation that gives an >>> empty >>>>>>>>> list. >>>>>>>>>>>>>>>>> - Will the interface allow users to specify the store >>>>> name, or >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> entire StoreBuilder? The entire StoreBuilder, so the >>>>>>>>>>>>>>>>> Processor/TransformerSupplier can completely encapsulate >>>>> name >>>>>>>>>>> and >>>>>>>>>>>>>>>>> implementation of a state store if desired. >>>>>>>>>>>>>>>>> - Will the old way of specifying store names alongside >>> the >>>>>>>>>>>> supplier >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> calling stream.process/transform() be deprecated? No, >>> this >>>>> is >>>>>>>>>>>>> still a >>>>>>>>>>>>>>>>> legitimate way to wire up Processors/Transformers and >>> their >>>>>>>>>>>> stores. >>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>> would recommend not allowing stream.process/transform() >>>>> calls >>>>>>>>>>>> that >>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>> store declaration mechanisms (this restriction is not in >>>>> the >>>>>>>>>>>> proof >>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> concept) >>>>>>>>>>>>>>>>> - How will we handle adding the same state store to the >>>>>>>>>>> topology >>>>>>>>>>>>>>>>> multiple times because different >>>>>>>>> Processor/TransformerSuppliers >>>>>>>>>>>>>>> declare >>>>>>>>>>>>>>>>> it? >>>>>>>>>>>>>>>>> topology.addStateStore() will be slightly relaxed for >>>>>>>>>>>> convenience, >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> allow adding the same StoreBuilder multiple times as >>> long >>>>> as >>>>>>>>>>> the >>>>>>>>>>>>>>> exact >>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>> StoreBuilder instance is being added for the same store >>>>> name. >>>>>>>>>>>> This >>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>> to prevent in practice the issue of accidentally making >>> two >>>>>>>>>>> state >>>>>>>>>>>>>>> stores >>>>>>>>>>>>>>>>> one by adding with the same name. For additional >>> safety, >>>>> if >>>>>>>>> we >>>>>>>>>>>>>>> wanted >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> (not in the proof of concept), we could allow for this >>>>>>>>>>> relaxation >>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> internal callers of topology.addStateStore(). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So, in summary, the use cases look like: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - 1 transformer/processor that owns its store: Using the >>>>> new >>>>>>>>>>>>>>>>> StateStoresSupplier interface method to supply its >>>>>>>>>>> StoreBuilders >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> be added to the topology automatically. >>>>>>>>>>>>>>>>> - Multiple transformer/processors that share the same >>>>> store: >>>>>>>>>>>> Either >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. The old way: the StoreBuilder is defined "far away" >>> from >>>>>>>>> the >>>>>>>>>>>>>>>>> Transformer/Processor implementations, and is added to >>> the >>>>>>>>>>>> topology >>>>>>>>>>>>>>>>> manually by the user >>>>>>>>>>>>>>>>> 2. The new way: the StoreBuilder is defined closer to >>> the >>>>>>>>>>>>>>>>> Transformer/Processor implementations, and the same >>>>> instance >>>>>>>>> is >>>>>>>>>>>>>>>>> returned by >>>>>>>>>>>>>>>>> all Transformer/ProcessorSuppliers that need it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This makes the KIP wiki a bit stale; I'll update if we >>> want to >>>>>>>>>>> bring >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> design to a vote. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang < >>>>>>>>> wangg...@gmail.com >>>>>>>>>>>> >>>>>>>>>>>> < >>>>>>>>>>>>>>> wangg...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Matthias / Paul, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The concern I had about introducing `StoreBuilderSupplier` >>> is >>>>>>>>>>> simply >>>>>>>>>>>>>>>>> because it is another XXSupplier to the public API, so I'd >>>>> like >>>>>>>>> to >>>>>>>>>>>> ask >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> we really have to add it :) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The difference between encapsulating the store name and >>>>>>>>>>>> encapsulating >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> full state store builder is that, in the former: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> String storeName = "store1"; >>>>>>>>>>>>>>>>> builder.addStore(new MyStoreBuilder(storeName)); >>>>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplier(storeName)); >>> // >>>>>>>>>>>>> following >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> proposal, that the store name can be passed in and used for >>>>> both >>>>>>>>>>>>>>>>> `listStores` and in the `Transformer#init`; so the >>> Transformer >>>>>>>>>>>>> function >>>>>>>>>>>>>>>>> does not need to get the constant string name again. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // one caveat to admit, is that >>>>>>>>>>>>>>>>> MyTransofmerSupplier logic may be just unique to `store1` >>> so >>>>> it >>>>>>>>>>>> cannot >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> reused with a different store name anyways. >>>>>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> While in the latter: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplierForStore1); // >>>>> the >>>>>>>>>>> name >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> just indicating that we may have one such supplier for each >>>>>>>>> store. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understand the latter introduce more convenience from the >>>>> API, >>>>>>>>>>> but >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> cost is that since we still cannot completely >>>>> `builder.addStore`, >>>>>>>>>>>> but >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> reduce its semantic scope to shared state stores only,; >>> hence >>>>>>>>>>> users >>>>>>>>>>>>> need >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> learn two ways of creating state stores for those two >>>>> patterns. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My argument is that more public APIs requires longer >>> learning >>>>>>>>>>> curve >>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> users, and introduces more usage patterns that may confuse >>>>> users >>>>>>>>>>>> (the >>>>>>>>>>>>>>>>> proposal I had tries to replace one with another >>> completely). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen < >>>>> pgwha...@gmail.com> >>>>>>>>>>> < >>>>>>>>>>>>>>> pgwha...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the great thoughts Matthias and Guozhang! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my >>> second >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> alternative >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the KIP is ("Have the added method on the Supplier >>> interfaces >>>>>>>>> only >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> store names, not builders"). I do think it would be a >>>>> worthwhile >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> usability >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> improvement on its own, but to Matthias's point, it doesn't >>>>>>>>>>> achieve >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> full goal of completing encapsulating a state store and >>> it's >>>>>>>>>>>> processor >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> encapsulates the name, but not the StateStoreBuilder. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm really intrigued by Matthias's idea that forgoes the >>>>> default >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> method I proposed. Having smaller, separate interfaces is >>> a >>>>>>>>>>>> powerful >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> idea >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and I think a cleaner API than what I proposed. The >>>>> non-shared >>>>>>>>>>>> store >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> case is handled well here, and the shared store use case is >>>>>>>>>>>> possible, >>>>>>>>>>>>>>>>> though maybe still not as graceful as we would like >>> (having to >>>>>>>>> add >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems >>> maybe >>>>> too >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> subtle >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> me). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We're all agreed that one of the big problems with the >>> shared >>>>>>>>>>> store >>>>>>>>>>>>> use >>>>>>>>>>>>>>>>> case is how to deal with adding the same store to the >>> topology >>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>> times. Catching the "store already added" exception is >>> risky. >>>>>>>>>>>> Here's >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> maybe radical idea: change `topology.addStateStore()` to be >>>>>>>>>>>> idempotent >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> adding a given state store name and `StoreBuilder`. In >>> other >>>>>>>>>>> words, >>>>>>>>>>>>>>>>> `addStateStore` would not throw the "store already added" >>>>>>>>>>> exception >>>>>>>>>>>> if >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> `StoreBuilder` being added for a given name has the same >>>>> identity >>>>>>>>>>> as >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> one that has already been added. Does this eliminate all >>> the >>>>>>>>> bugs >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> we're >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> worried about? Thinking about it for a few minutes, it >>> seems >>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> eliminate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> most at least (would a user really use the exact same >>>>>>>>> StoreBuilder >>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> they intend there to be two stores?). It might make the >>> API >>>>>>>>>>>> slightly >>>>>>>>>>>>>>>>> harder to use if a user isn't immediately aware of that >>>>> subtlety, >>>>>>>>>>>> but >>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> good error message should ease the pain, and it would >>> happen >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> immediately >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> during development. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> And with regards to Matthias's comment about whether we >>> need >>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> deprecate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> existing varargs transform methods - I don't think we need >>> to, >>>>>>>>> but >>>>>>>>>>>> it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be nice for there only to be one way to do things, assuming >>>>>>>>>>> whatever >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> come up with supports all existing use cases. I don't feel >>>>>>>>>>> strongly >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this, but if we don't deprecate, I do think it's important >>> to >>>>> add >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> checks >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that prevent users from trying to do the same thing in two >>>>>>>>>>> different >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ways, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> as we've discussed. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < >>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding the last option to catch "store exist already" >>>>>>>>> exception >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be >>>>> hiding >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> user bugs. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree with this concern. From my original email: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The only disadvantage I see, might be >>>>>>>>>>>>>>>>> potential bugs about sharing state if two different stores >>> are >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> named >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> same by mistake (this would not be detected). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For your new proposal: I am not sure if it addresses Paul's >>>>>>>>>>> original >>>>>>>>>>>>>>>>> idea -- I hope Paul can clarify. From my understanding, the >>>>> idea >>>>>>>>>>> was >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> encapsulate a store and its processor. As many stores are >>> not >>>>>>>>>>>> shared, >>>>>>>>>>>>>>>>> this seems to be quite useful. Your proposal falls a little >>>>> short >>>>>>>>>>> to >>>>>>>>>>>>>>>>> support encapsulation for none-shared stores. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your feedbacks. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding the last option to catch "store exist already" >>>>>>>>> exception >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be >>>>> hiding >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> user bugs. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thinking about Paul's proposal and your suggestion again, >>> I'd >>>>>>>>> like >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> propose another alternative somewhere in the middle of your >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> approaches, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> i.e. we still let users to create sharable state stores via >>>>>>>>>>>>>>>>> `addStateStore`, and we allow the TransformerSupplier to >>>>> return a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> list >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state stores that it needs, i.e.: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public interface TransformerSupplier<K, V, R> { >>>>>>>>>>>>>>>>> Transformer<K, V, R> get(); >>>>>>>>>>>>>>>>> default List<String> stateStoreNames() { >>>>>>>>>>>>>>>>> return Collections.emptyList(); >>>>>>>>>>>>>>>>> < >>>>>>>>>>> >>> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ;> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> by doing this users can still "consolidate" the references >>> of >>>>>>>>>>> store >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> names >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> in a single place in the transform call, e.g.: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public class MyTransformerSupplier<K, V, R> { >>>>>>>>>>>>>>>>> private String storeName; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public class MyTransformer<K, V, R> { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> init() { >>>>>>>>>>>>>>>>> store = context.getStateStore(storeName); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> default List<String> stateStoreNames() { >>>>>>>>>>>>>>>>> return Collections.singletonList(storeName); >>>>>>>>>>>>>>>>> < >>>>>>>>>>> >>> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ;> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Basically, we move the parameters from the caller of >>>>> `transform` >>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> inside >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the TransformSuppliers. DSL implementations would not >>> change >>>>>>>>> much, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> simply >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> calling `connectStateStore` by getting the list of names >>> from >>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> provided >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> function. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just a meta comment: do we really need to deprecate >>> existing >>>>>>>>>>>>>>>>> `transform()` etc methods? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The last argument is a vararg, and thus, just keeping the >>>>>>>>> existing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for this part seems to work too, allowing to implement both >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> patterns? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Also, instead of adding a default method, we could also >>> add a >>>>> new >>>>>>>>>>>>>>>>> interface `StoreBuilderSupplier` with method >>>>> `List<StoreBuilder> >>>>>>>>>>>>>>>>> stateStores()` -- users could implement >>> `TransformerSupplier` >>>>> and >>>>>>>>>>>>>>>>> `StoreBuilderSupplier` at once; and for this case, we >>> require >>>>>>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> don't provide store name in `transform()`. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Similar, we could add an interface `StoreNameSupplier` with >>>>>>>>> method >>>>>>>>>>>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> transformer >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to existing stores (to avoid the issue to add the same >>> store >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> times). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hence, for shared stores, there would be one "main" >>>>> transformer >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> implements `StoreBuilderSupplier` and that must be added >>>>> first to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> topology. The other transformers would implement >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> `StoreNameSupplier` >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> just connect to those stores. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Another possibility to avoid the issue of adding the same >>>>> stores >>>>>>>>>>>>>>>>> multiple times would be, that the DSL always calls >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> `addStateStore()` >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> catches a potential "store exists already" exception and >>> falls >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we >>>>> would >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the `StoreNameSupplier` interface and the order in which >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> transformers >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> are added would not matter either. The only disadvantage I >>>>> see, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> potential bugs about sharing state if two different stores >>> are >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> named >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> same by mistake (this would not be detected). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just some ideas I wanted to share. What do you think? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ah yes of course, this was an oversight, I completely >>> ignored >>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> processors sharing the same state store when writing up the >>>>> KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Which >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> funny, because I've actually done this (different >>> processors >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> sharing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stores) a fair amount myself, and I've settled on a pattern >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> where I >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> group >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the Processors in an enclosing class, and that enclosing >>> class >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> handles >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> much as possible. Here's a gist showing the rough >>> structure, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> context: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> . Note how it adds the stores to the topology, as well as >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> providing a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public method with the store names. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I don't think my proposal completely conflicts with the >>>>> multiple >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> processors >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> sharing state stores use case, since you can create a >>> supplier >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> provides the store name you want, somewhat independently of >>>>> your >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Processor logic. The issue I do see though, is that >>>>>>>>>>>>>>>>> topology.addStateStore() can only be called once for a >>> given >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> your example, if the there was a single TransformerSupplier >>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> passed >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> into both transform() calls, "store1" would be added (under >>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> hood) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the topology twice, which is no good. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Perhaps this suggests that one of my alternatives on the >>> KIP >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> desirable: either not having the suppliers return >>>>> StoreBuilders >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (just >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> names), or not deprecating the old methods that take >>>>> "String... >>>>>>>>>>>>>>>>> stateStoreNames". I'll have to think about it a bit. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the great writeup (very detailed and crystal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> motivation >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> sections!). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is quite an interesting idea and I do like the API >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> cleanness >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> proposed. The original motivation of letting >>> StreamsTopology >>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stores though, is to allow different processors to share >>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For example: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> builder.addStore("store1"); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // a path of stream transformations that leads to KStream >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream1. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream1.transform(..., "store1"); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // another path that generates a KStream stream2. >>>>>>>>>>>>>>>>> stream2.transform(..., "store1"); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Behind the scene, Streams will make sure stream1 / stream2 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> transformations >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> will always be grouped together as a single group of tasks, >>>>> each >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> will be executed by a single thread and hence there's no >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> issues >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on accessing the store from different operators within the >>>>> same >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> task. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> not sure how common this use case is, but I'd like to hear >>> if >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> thoughts maintaining this since the current proposal seems >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> exclude >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> possibility. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen < >>>>> pgwha...@gmail.com> >>>>>>>>> < >>>>>>>>>>>>>>> pgwha...@gmail.com> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API >>>>> change >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> think could greatly increase the usability of the low-level >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> processor >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have some code written but will wait to see if there is >>> buy >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> going all out and creating a pull request. It seems like >>> most >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> would be in updating documentation and tests. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>> >>>> >>> >>> >
signature.asc
Description: OpenPGP digital signature