It wasn't much of a lift changing option B to work for option C, so I closed that PR and made a new one, which should be identical to the KIP right now: https://github.com/apache/kafka/pull/6824. There are a few todos still which I will hold off until the KIP is accepted.
I created a voting thread about a month ago, so I'll bump that now that we're nearly there. Paul On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com> wrote: > Per Matthias's suggestion from a while ago, I actually implemented a good > amount of option B to get a sense of the user experience and documentation > requirements. For a few reasons mentioned below, I think it's not my > favorite option, and I prefer option C. But since I did the work and it > can help discussion, I may as well share: > https://github.com/apache/kafka/pull/6821. > > Things I learned along the way implementing Option B: > - For the name of the interface, I like ConnectedStoreProvider. It isn't > perfect but it seems to capture the general gist without being overly > verbose. I get that from a strict standpoint it's not "providing connected > stores" but is instead "providing stores to be connected," but I think that > in context and with documentation, the risk of someone being confused by > that is low. > - I definitely felt the discoverability issue while trying to write clear > documentation; you really have to make sure to connect the dots for the > user when the interface isn't connected to anything. > - Another problem with a separate interface found while writing > tests/examples: defining a ProcessorSupplier that also implements > ConnectedStoreProvider cannot be done anonymously, since you can't define > an anonymous class in Java that implements multiple interfaces. I actually > consider this a fairly major usability issue - it means a user always has > to have a custom class rather than doing it inline. We could provide an > abstract class that implements the two, but at that point, we're not that > far from option A or C anyway. > > I updated the KIP with my current thinking, which as mentioned is > Matthias's option C. Once again for clarity, that *is not* what is in the > linked pull request. The current KIP is my proposal. > > Thanks everyone for the input! > > P.S. What do folks use to edit the HTML documentation, e.g. > processor-api.html? I looked at doing it by hand it but it kind of looked > like agony with all the small tags required for formatting code, so I'm > sort of assuming there's tooling for it. > > On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> I think the discussion mixed approaches a little bit, hence, let me >> rephrase my understanding: >> >> >> A) add new method with default implementation to `ProcessorSupplier`: >> >> For this case, we don't add a new interface, but only add a new method >> to `ProcessorSupplier` -- to keep backward compatibility, we need to add >> a default implementation. Users opt into the new feature by overwriting >> the default implementation. >> >> >> B) We add a new interface with new method: >> >> For this case, `ProcessorSupplier` interface is not changed and it does >> also _not_ extend the new interface. Because `ProcessorSupplier` is not >> changed, it's naturally backward compatible. Users opt into the new >> feature, by adding the new interface to their ProcessorSupplier >> implementation and they need to implement the new method because there >> is no default implementation. Kafka Streams can use `instanceof` to >> detect if the new interface is used or not and thus, to the right thing. >> >> >> What was also discussed is a mix of both: >> >> C) We add a new interface with new method and let `ProcessorSupplier` >> extend the new interface: >> >> Here, we need to add a default implementation to preserve backward >> compatibility. Similar to (A), users opt into the feature by overwriting >> the default implementation. >> >> >> >> Option (C) is the same as (A) from a user point of view because a user >> won't care about the new interface. It only makes a difference for our >> code base, as we can share the default implementation of the new method >> This is only a small gain, as the implementation is trivial but also a >> small drawback as we add new public interface that is useless to the >> user because the user would never implement the interface directly. >> >> >> >> For (A/C), it might be simpler for users to detect the feature. For (B), >> we have the advantage that users must implement the method if they use >> the new interface. >> >> Overall, it seems that (A) might be the best choice because it makes the >> feature easier discoverable and does not add a "useless" interface. If >> you want to go with (C) to share the default implementation code, that's >> also fine with me. I am convinced now (even if I brought it up), that >> (B) might be not optimal because feature discoverability seems to be >> important. >> >> >> >> >> About `null` vs `emptyList`: I still tend to like `null` better but it's >> really a detail and not too important. Note, that the question only >> arises for (A/C), but not for (B) because for (B) we don't need a >> default implementation. >> >> >> >> >> @Paul: It's unclear to me atm what your final proposal is because you >> mentioned that you might want to rename `StateStoreConnector`? It's also >> unclear to me atm, if you prefer (A), (B), or (C). >> >> Maybe you can update the KIP if necessary and clearly state what you >> final proposal is. Beside this, it seems we can move to a VOTE? >> >> >> >> -Matthias >> >> >> >> >> >> On 5/2/19 3:01 PM, Bruno Cadonna wrote: >> > Hi Paul, >> > >> > I will try to express myself a bit clearer. >> > >> > Ad 1) >> > My assumption is that if `StateStoreConnector#stateStores()` returns >> `null` >> > Kafka Streams will throw an NPE because on purpose no null check is >> > performed before the loop that calls `StreamsBuilder#addStateStore()`. >> When >> > the user finally understands the cause of the NPE, she knows that she >> has >> > to override `StateStoreConnector#stateStores()` in her implementation. >> My >> > question was, why let the user discover that she has to overwrite the >> > method at runtime if you could not provide a default implementation for >> > `StateStoreConnector#stateStores()` and let the compiler tell the user >> the >> > need to overwrite the method. Not providing a default implementation >> > without separating the interfaces implies not being backward-compatible. >> > That means, if we choose to not provide a default implementation and let >> > the compiler signal the necessity to override the method, we have to >> > separate the interfaces in any case. >> > >> > Ad 2) >> > If you check for `null` or empty list in `process` and do not call >> > `addStateStores` in those cases, the advantage of returning `null` to be >> > saver to detect bugs as mentioned by Matthias would be lost. But maybe >> I am >> > missing something here. >> > >> > Best, >> > Bruno >> > >> > >> > >> > On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> wrote: >> > >> >> I definitely don't mind anyone jumping, Bruno, thanks for the comments! >> >> >> >> 1) I'm not totally sure I'm clear on your point, but I think we're on >> the >> >> same page - if we're adding a method to the XSupplier interfaces (by >> making >> >> them inherit from a super interface StateStoreConnector) then we >> definitely >> >> need a default implementation to maintain compatibility. Whether the >> >> default implementation returns null or an empty list is somewhat of a >> >> detail. >> >> >> >> 2) If stream.process() sees that StateStoreConnector#stateStores() >> returns >> >> either null or an empty list, it would handle that case specifically >> and >> >> not try to call addStateStore at all. Or is this not what you're >> asking? >> >> >> >> Separately, I'm still hacking away at the details of the PR and will >> >> continue to get something into a discussable state, but I'll share some >> >> thoughts I've run into. >> >> >> >> A) I'm tentatively going the separate interface route (Matthias's >> >> suggestion) and naming it ConnectedStoreProvider. Still don't love the >> >> name, but there's something nice about the name indicating *why* this >> thing >> >> is providing the store, not just that it is providing it. >> >> >> >> B) It has occurred to me that topology.addProcessor() could also >> recognize >> >> if ProcessorSupplier implements ConnectedStoreProvider and add and >> connect >> >> stores appropriately. This isn't in the KIP and I think the value-add >> is >> >> lower (if you're reaching that low level, surely the "auto add/connect >> >> store" isn't too important to you), but I think it would be a >> confusing if >> >> it didn't, and I don't see any real downside. >> >> >> >> Paul >> >> >> >> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io> >> wrote: >> >> >> >>> Hi, >> >>> >> >>> @Paul: Thank you for the KIP! >> >>> >> >>> I hope you do not mind that I jump in. >> >>> >> >>> I have the following comments: >> >>> >> >>> 1) `null` vs empty list in the default implementation >> >>> IIUC, returning `null` in the default implementation should basically >> >>> signal that the method `stateStores` was not overridden. Why then >> >> provide a >> >>> default implementation in the first place? Without default >> implementation >> >>> you would discover the missing implementation already at compile-time >> and >> >>> not only at runtime. If you decide not to provide a default >> >> implementation, >> >>> `XSupplier extends StateStoreConnector` would break existing code as >> >>> Matthias has already pointed out. >> >>> >> >>> 2) `process` method adding the StoreBuilders to the topology >> >>> If the default implementation returned `null` and `XSupplier extends >> >>> StateStoreConnector`, then existing code would break, because >> >>> `StreamsBuilder#addStateStore()` would throw a NPE. >> >>> >> >>> +1 for opening a WIP PR >> >>> >> >>> Best, >> >>> Bruno >> >>> >> >>> >> >>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax < >> matth...@confluent.io> >> >>> wrote: >> >>> >> >>>> Thank Paul! >> >>>> >> >>>> I agree with all of that. If we think that the general design is >> good, >> >>>> refactoring a PR if we want to pick a different name should not be >> too >> >>>> much additional work (hopefully). Thus, if you want to open a WIP PR >> >> and >> >>>> we use it to nail the open details, it might help to find a good >> >>>> conclusion. >> >>>> >> >>>>>> 2) Default method vs new interface: >> >>>> >> >>>> This seems to be the hardest tradeoff. I see the point about >> >>>> discoveability... Might be good to get input from others, which >> version >> >>>> they would prefer. >> >>>> >> >>>> Just to make clear, my suggestion from the last email was, that >> >>>> `Transformer` etc does not extend the new interface. Instead, a user >> >>>> that want to use this feature would need to implement both >> interfaces. >> >>>> >> >>>> If `Transformer extends StoreProvider` (just picking a name here) >> >>>> without default implementation existing code would break and thus it >> >> not >> >>>> a an option because of breaking backward compatibility. >> >>>> >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 4/28/19 8:37 PM, Paul Whalen wrote: >> >>>>> Great thoughts Matthias, thanks! I think we're all agreed that >> naming >> >>> and >> >>>>> documentation/education are the biggest hurdles for this KIP, and in >> >>>> light >> >>>>> of that, I think it makes sense for me to just take a stab at a full >> >>>>> fledged PR with documentation to convince us that it's possible to >> do >> >>> it >> >>>>> with enough clarity. >> >>>>> >> >>>>> In response to your specific thoughts: >> >>>>> >> >>>>> 1) StateStoreConnector as a name: Really good point about defining >> >> the >> >>>>> difference between "adding" and "connecting." Guozhang suggested >> >>>>> StateStoreConnector which was definitely an improvement over my >> >>>>> StateStoresSupplier, but I think you're right that we need to be >> >>> careful >> >>>> to >> >>>>> make it clear that it's really accomplishing both. Thinking about >> it >> >>>> now, >> >>>>> one problem with Connector is that the implementer of the interface >> >> is >> >>>> not >> >>>>> really doing any connecting, it's providing/supplying the store that >> >>> will >> >>>>> be both added and connected. StoreProvider seems reasonable to me >> >> and >> >>>>> probably the best candidate at the moment, but it would be nice if >> >> the >> >>>> name >> >>>>> could convey that it's providing the store specifically so the >> caller >> >>> can >> >>>>> add it to the topology and connect it to the associated transformer. >> >>>>> >> >>>>> In general I think that really calling out what "adding" versus >> >>>>> "connecting" is in the documentation will help make the entire >> >> purpose >> >>> of >> >>>>> this feature more clear to the user. >> >>>>> >> >>>>> 2) Default method vs new interface: The choice of a default method >> >> was >> >>>>> influenced by Guozhang's fear about API bloat/discoverability. I >> can >> >>>>> definitely see it both ways Would the separate interface be a >> >>>>> sub-interface of Processor/TransformerSupplier or standalone? It >> >> seems >> >>>>> like you're suggesting standalone and I think that's what I favor. >> >> My >> >>>> only >> >>>>> concern there is that the interface wouldn't actually be a type to >> >> any >> >>>>> public API which sort of hurts discoverability. You would have to >> >> read >> >>>> the >> >>>>> javadocs for stream.process/transform() to discover that >> implementing >> >>> the >> >>>>> interface in addition to Processor/TransformerSupplier would add and >> >>>>> connect the store for you. But that added burden actually probably >> >>> helps >> >>>>> us in terms of making sure people don't mix and match, like you >> said. >> >>>>> >> >>>>> 3) Returning null instead of empty: Seems fair to me. I always >> worry >> >>>> about >> >>>>> returning null when an empty collection can be used instead, but >> >> given >> >>>> that >> >>>>> the library is the caller rather than the client I think your point >> >>> makes >> >>>>> sense here. >> >>>>> >> >>>>> 4) Returning Set instead of Collection: Agreed, don't see why not to >> >>> make >> >>>>> it more specific. >> >>>>> >> >>>>> Paul >> >>>>> >> >>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax < >> >> matth...@confluent.io >> >>>> >> >>>>> wrote: >> >>>>> >> >>>>>> Hi, sorry for the long pause. Just trying to catch up here. >> >>>>>> >> >>>>>> I think it save to allow `addStateStore()` to be idempotent for the >> >>> same >> >>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus >> >>> it's >> >>>>>> not really possible to use the same `StoreBuilder` object to create >> >>>>>> different stores. >> >>>>>> >> >>>>>> I also agree with the concern, that only allowing a single store >> (as >> >>>>>> proposed by Ivan) might be too restrictive. >> >>>>>> >> >>>>>> Overall, the current KIP version LGTM. I don't have mayor concerns >> >>> about >> >>>>>> user education for this case, but I agree that we need to document >> >>> this >> >>>>>> clearly. >> >>>>>> >> >>>>>> Some further comments: >> >>>>>> >> >>>>>> (1) I am not sure if `StateStoreConnector` is the best name for the >> >>> new >> >>>>>> interface. Note, that there are two concepts about stores: >> >>>>>> >> >>>>>> - adding a store: this makes the store available in the topology >> in >> >>>>>> general (however, the store is still "dangling", and not used) >> >>>>>> - connecting a store: this allows a processor etc to use a store >> >>>>>> >> >>>>>> The new interface does both, but its name only indicates that >> second >> >>>>>> part what might be confusing. It might be especially confusing >> >> because >> >>>>>> we want to disallow to mix the exiting "manually add and connect" >> >>>>>> pattern, with a new pattern to "auto add+connect". If the new >> >>> interface >> >>>>>> name indicates the connect part only, user might think they need to >> >>> add >> >>>>>> stores manually and can connect automatically. >> >>>>>> >> >>>>>> Unfortunately, I don't have a much better suggestion for a name >> >>> either. >> >>>>>> The only idea that came to my mind was `StoreProvider`: to me, a >> >>>>>> provider is a "service" interface that does work for us, ie, it >> adds >> >>> and >> >>>>>> connects a store. Not sure if this is too subtle, if we consider >> >> that >> >>>>>> there is already the `StoreSupplier` interface? >> >>>>>> >> >>>>>> But maybe somebody else might still have a good idea on how the >> >>> improve >> >>>>>> the name. >> >>>>>> >> >>>>>> In any case, I would suggest to shorten the name to >> `StoreConnector` >> >>>>>> instead of `StateStoreConnector`, because we also have >> >> `StoreSupplier` >> >>>>>> and `StoreBuilder`. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> (2) The KIP proposes to add the new interface to >> `ProcessorSupplier` >> >>> etc >> >>>>>> and to add a default implementation for the new method. Hence, user >> >>>>>> would need to overwrite this default implementation to op-in to the >> >>>>>> feature. I am wonder if it might be better to not add the new >> >>> interface >> >>>>>> to `ProcessorSupplier` etc and to just provide a new interface with >> >> no >> >>>>>> default implementation. Users would opt-in by adding the interface >> >>>>>> explicitly to their existing `ProcessorSupplier` implementation. >> >>>>>> Overwriting a default method and getting different behavior seems >> to >> >>> be >> >>>>>> a little subtle to me, especially, because we don't want to allow >> to >> >>>>>> mix-and-match the old and new approaches. Think: I only overwrite a >> >>>>>> default method and my code breaks. >> >>>>>> >> >>>>>> Thoughts? >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> (3) If we keep the current default implementation for the new >> >> method, >> >>> I >> >>>>>> am wondering if it should return `null` instead of an empty >> >>> collection? >> >>>>>> This might be saver to detect bugs in user code for which, per >> >>> accident, >> >>>>>> an empty collection could be returned. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> (4) Should the new method return a `Set` instead of a `Collection` >> >> to >> >>>>>> indicate the semantics clearly (ie, returning the same >> >> `StoreBuilder` >> >>>>>> multiple times is idempotent and one cannot add+connect to it >> >> twice). >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote: >> >>>>>>> Ivan and Guozhang, >> >>>>>>> >> >>>>>>> Thanks for the thoughts! Ivan's use case is definitely >> >> interesting. >> >>>> The >> >>>>>>> way I see it, if we can achieve the main goal of the KIP (allowing >> >>>>>>> Processor/TransformerSuppliers to encapsulate their usage of state >> >>>>>> stores), >> >>>>>>> we will enable this kind of thing in "user space" very easily. >> >>>>>>> >> >>>>>>> I will say that I'm not totally sure that most use cases of >> >>> transform() >> >>>>>> use >> >>>>>>> just one state store. It's hard to know since I haven't seen many >> >>>>>> examples >> >>>>>>> in public, but my team's usages almost exclusively require >> multiple >> >>>> state >> >>>>>>> stores. We only reach for the low level processor API when we >> need >> >>>> that >> >>>>>>> complexity, and it's somewhat hard to imagine many use cases that >> >>> only >> >>>>>> need >> >>>>>>> one state store, since the high level DSL can usually accomplish >> >>> those >> >>>>>>> tasks. The example Ivan presented for instance looks like a >> >>>>>>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious what >> >>> sort >> >>>> of >> >>>>>>> other usages you're imagining. >> >>>>>>> >> >>>>>>> That being said, perhaps the Processor API should really just be >> >>>>>> considered >> >>>>>>> a separate paradigm in Streams, not just a lower level that we >> >> reach >> >>> to >> >>>>>>> when necessary. In which case it would be beneficial to make the >> >>>> simple >> >>>>>>> use cases easier. I've definitely talked about this with my own >> >>> team - >> >>>>>> if >> >>>>>>> you're less familiar with the kind of functional style that the >> >> high >> >>>>>> level >> >>>>>>> DSL offers, it might be easier to "see" your state and interact >> >> with >> >>> it >> >>>>>>> directly. >> >>>>>>> >> >>>>>>> Anyway, I've updated the KIP to reflect my current PR with >> >> Guozhang's >> >>>>>>> suggestions. It seems like there is at least some interest in >> that >> >>> on >> >>>>>> its >> >>>>>>> own and not a ton of pushback, so I think I will try to start a >> >> vote. >> >>>>>>> >> >>>>>>> Paul >> >>>>>>> >> >>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev < >> >> iponoma...@mail.ru> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi all! >> >>>>>>>> >> >>>>>>>> I was about to write another KIP, but found out that KIP-401 >> >>> addresses >> >>>>>>>> exactly the problem I faced. So let me jump into your discussion >> >> and >> >>>> ask >> >>>>>>>> you to assess another idea. >> >>>>>>>> >> >>>>>>>> I fully agree with the KIP-401's motivation part. E. g in my >> >>> project I >> >>>>>> had >> >>>>>>>> to invent a wrapper class that hides the details of KeyValueStore >> >>>>>>>> management from business logic. Of course this should be done >> >> better >> >>>> in >> >>>>>>>> KStreams API. >> >>>>>>>> >> >>>>>>>> But I was about to look at this problem from another side and >> >>> propose >> >>>> a >> >>>>>>>> simple alternative in high-level DSL, that will not fit all the >> >>> cases, >> >>>>>> but >> >>>>>>>> most of them. Hence my idea does not exclude the Paul's proposal. >> >>>>>>>> >> >>>>>>>> What if we restrict ourselves to *only one* KeyValueStore and >> >>> propose >> >>>> a >> >>>>>>>> method that resembles `aggregate` and `reduce` methods, like >> >> this: >> >>>>>>>> >> >>>>>>>> stream >> >>>>>>>> .map(...) >> >>>>>>>> .filter(...) >> >>>>>>>> .transform ((k, v, s)->{....}, Transformed.with(....)) >> >>>>>>>> >> >>>>>>>> where >> >>>>>>>> * k, v -- input key & value >> >>>>>>>> * s -- a KeyValueStore provided as an argument >> >>>>>>>> * return value of the lambda should be KeyValue.pair(...) >> >>>>>>>> * Transformed.with... is a builder, used in order to define the >> >>>>>>>> Transformer and KeyValueStore building parameters. Some of these >> >>>>>> parameters >> >>>>>>>> should be: >> >>>>>>>> ** store's KeySerde, >> >>>>>>>> ** store's ValueSerde, >> >>>>>>>> ** whether the store is persistent or in-memory, >> >>>>>>>> ** store's name -- optional parameter, the system should be able >> >> to >> >>>>>> devise >> >>>>>>>> the name of the store transparently for the user, if we don't >> want >> >>> to >> >>>>>>>> devise it ourselves/share the store between processors. >> >>>>>>>> ** scheduled punctuation. >> >>>>>>>> >> >>>>>>>> Imagine we have a KStream<String, Integer>, and we need to >> >>> calculate a >> >>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the >> provided >> >>>>>> integer >> >>>>>>>> values. >> >>>>>>>> >> >>>>>>>> This could be achieved as simple as >> >>>>>>>> >> >>>>>>>> stream.transform((key, value, stateStore) -> { >> >>>>>>>> int previousValue = >> >>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0); >> >>>>>>>> stateStore.put(key, value); >> >>>>>>>> return KeyValue.pair(key, value - previousValue); >> >>>>>>>> } >> >>>>>>>> //we do not need to bother with store name, punctuation >> >> etc. >> >>>>>>>> //may be even Serde part can be omitted, since we can >> >>> inherit >> >>>>>> the >> >>>>>>>> serdes from stream by default >> >>>>>>>> , Transformed.with(Serdes.String(), Serdes.Integer()) >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> The hard part of it is that new `transform` method definition >> >> should >> >>>> be >> >>>>>>>> parameterized by six type parameters: >> >>>>>>>> >> >>>>>>>> * input/output/KeyValueStore key type, >> >>>>>>>> * input/output/KeyValueStore value type. >> >>>>>>>> >> >>>>>>>> However, it seems that all these types can be inferred from the >> >>>> provided >> >>>>>>>> lambda and Transformed.with instances. >> >>>>>>>> >> >>>>>>>> What do you think about this? >> >>>>>>>> >> >>>>>>>> Regards, >> >>>>>>>> >> >>>>>>>> Ivan >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет: >> >>>>>>>> >> >>>>>>>> Hello Paul, >> >>>>>>>> >> >>>>>>>> Thanks for the uploaded PR and the detailed description! I've >> >> made a >> >>>>>> pass >> >>>>>>>> on it and left some comments. >> >>>>>>>> >> >>>>>>>> Overall I think I agree with you that passing in the storebuilder >> >>>>>> directly >> >>>>>>>> that store name is more convienent as it does not require another >> >>>>>>>> `addStore` call, but we just need to spend some more >> documentation >> >>>>>> effort >> >>>>>>>> on educating users about the two ways of connecting their stores. >> >>> I'm >> >>>>>>>> slightly concerned about this education curve but I can be >> >> convinced >> >>>> if >> >>>>>>>> most people felt it is worthy. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Guozhang >> >>>>>>>> >> >>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> >> >> < >> >>>>>> pgwha...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> I'd like to resurrect this discussion with a cursory, >> >>> proof-of-concept >> >>>>>>>> implementation of the KIP which combines many of our ideas: >> >>>>>> https://github.com/apache/kafka/pull/6496. I tried to keep the >> >> diff >> >>> as >> >>>>>>>> small as possible for now, just using it to convey the main >> ideas. >> >>>> But >> >>>>>>>> I'll separately address some of our earlier discussion: >> >>>>>>>> >> >>>>>>>> - Will there be a new, separate interface for users to >> >> implement >> >>>> for >> >>>>>> the >> >>>>>>>> new functionality? No, to hopefully keep things simple, all of >> >>> the >> >>>>>>>> Processor/TransformerSupplier interfaces will just extend >> >>>>>>>> StateStoresSupplier, allowing users to opt in to this >> >>> functionality >> >>>>>> by >> >>>>>>>> overriding the default implementation that gives an empty >> list. >> >>>>>>>> - Will the interface allow users to specify the store name, or >> >>> the >> >>>>>>>> entire StoreBuilder? The entire StoreBuilder, so the >> >>>>>>>> Processor/TransformerSupplier can completely encapsulate name >> >> and >> >>>>>>>> implementation of a state store if desired. >> >>>>>>>> - Will the old way of specifying store names alongside the >> >>> supplier >> >>>>>> when >> >>>>>>>> calling stream.process/transform() be deprecated? No, this is >> >>>> still a >> >>>>>>>> legitimate way to wire up Processors/Transformers and their >> >>> stores. >> >>>>>> But >> >>>>>>>> I >> >>>>>>>> would recommend not allowing stream.process/transform() calls >> >>> that >> >>>>>> use >> >>>>>>>> both >> >>>>>>>> store declaration mechanisms (this restriction is not in the >> >>> proof >> >>>> of >> >>>>>>>> concept) >> >>>>>>>> - How will we handle adding the same state store to the >> >> topology >> >>>>>>>> multiple times because different >> Processor/TransformerSuppliers >> >>>>>> declare >> >>>>>>>> it? >> >>>>>>>> topology.addStateStore() will be slightly relaxed for >> >>> convenience, >> >>>>>> and >> >>>>>>>> will >> >>>>>>>> allow adding the same StoreBuilder multiple times as long as >> >> the >> >>>>>> exact >> >>>>>>>> same >> >>>>>>>> StoreBuilder instance is being added for the same store name. >> >>> This >> >>>>>>>> seems >> >>>>>>>> to prevent in practice the issue of accidentally making two >> >> state >> >>>>>> stores >> >>>>>>>> one by adding with the same name. For additional safety, if >> we >> >>>>>> wanted >> >>>>>>>> to >> >>>>>>>> (not in the proof of concept), we could allow for this >> >> relaxation >> >>>>>> only >> >>>>>>>> for >> >>>>>>>> internal callers of topology.addStateStore(). >> >>>>>>>> >> >>>>>>>> So, in summary, the use cases look like: >> >>>>>>>> >> >>>>>>>> - 1 transformer/processor that owns its store: Using the new >> >>>>>>>> StateStoresSupplier interface method to supply its >> >> StoreBuilders >> >>>> that >> >>>>>>>> will >> >>>>>>>> be added to the topology automatically. >> >>>>>>>> - Multiple transformer/processors that share the same store: >> >>> Either >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 1. The old way: the StoreBuilder is defined "far away" from >> the >> >>>>>>>> Transformer/Processor implementations, and is added to the >> >>> topology >> >>>>>>>> manually by the user >> >>>>>>>> 2. The new way: the StoreBuilder is defined closer to the >> >>>>>>>> Transformer/Processor implementations, and the same instance >> is >> >>>>>>>> returned by >> >>>>>>>> all Transformer/ProcessorSuppliers that need it >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> This makes the KIP wiki a bit stale; I'll update if we want to >> >> bring >> >>>>>> this >> >>>>>>>> design to a vote. >> >>>>>>>> >> >>>>>>>> Thanks! >> >>>>>>>> Paul >> >>>>>>>> >> >>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang < >> wangg...@gmail.com >> >>> >> >>> < >> >>>>>> wangg...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Matthias / Paul, >> >>>>>>>> >> >>>>>>>> The concern I had about introducing `StoreBuilderSupplier` is >> >> simply >> >>>>>>>> because it is another XXSupplier to the public API, so I'd like >> to >> >>> ask >> >>>>>> if >> >>>>>>>> we really have to add it :) >> >>>>>>>> >> >>>>>>>> The difference between encapsulating the store name and >> >>> encapsulating >> >>>>>> the >> >>>>>>>> full state store builder is that, in the former: >> >>>>>>>> >> >>>>>>>> ----------- >> >>>>>>>> >> >>>>>>>> String storeName = "store1"; >> >>>>>>>> builder.addStore(new MyStoreBuilder(storeName)); >> >>>>>>>> stream1.transform(new MyTransformerSupplier(storeName)); // >> >>>> following >> >>>>>>>> >> >>>>>>>> my >> >>>>>>>> >> >>>>>>>> proposal, that the store name can be passed in and used for both >> >>>>>>>> `listStores` and in the `Transformer#init`; so the Transformer >> >>>> function >> >>>>>>>> does not need to get the constant string name again. >> >>>>>>>> >> >>>>>>>> // one caveat to admit, is that >> >>>>>>>> MyTransofmerSupplier logic may be just unique to `store1` so it >> >>> cannot >> >>>>>> be >> >>>>>>>> reused with a different store name anyways. >> >>>>>>>> ----------- >> >>>>>>>> >> >>>>>>>> While in the latter: >> >>>>>>>> >> >>>>>>>> ----------- >> >>>>>>>> >> >>>>>>>> stream1.transform(new MyTransformerSupplierForStore1); // the >> >> name >> >>>> is >> >>>>>>>> just indicating that we may have one such supplier for each >> store. >> >>>>>>>> >> >>>>>>>> ----------- >> >>>>>>>> >> >>>>>>>> I understand the latter introduce more convenience from the API, >> >> but >> >>>> the >> >>>>>>>> cost is that since we still cannot completely `builder.addStore`, >> >>> but >> >>>>>>>> >> >>>>>>>> only >> >>>>>>>> >> >>>>>>>> reduce its semantic scope to shared state stores only,; hence >> >> users >> >>>> need >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> learn two ways of creating state stores for those two patterns. >> >>>>>>>> >> >>>>>>>> My argument is that more public APIs requires longer learning >> >> curve >> >>>> for >> >>>>>>>> users, and introduces more usage patterns that may confuse users >> >>> (the >> >>>>>>>> proposal I had tries to replace one with another completely). >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Guozhang >> >>>>>>>> >> >>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> >> >> < >> >>>>>> pgwha...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Thanks for the great thoughts Matthias and Guozhang! >> >>>>>>>> >> >>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my second >> >>>>>>>> >> >>>>>>>> alternative >> >>>>>>>> >> >>>>>>>> on >> >>>>>>>> >> >>>>>>>> the KIP is ("Have the added method on the Supplier interfaces >> only >> >>>>>>>> >> >>>>>>>> return >> >>>>>>>> >> >>>>>>>> store names, not builders"). I do think it would be a worthwhile >> >>>>>>>> >> >>>>>>>> usability >> >>>>>>>> >> >>>>>>>> improvement on its own, but to Matthias's point, it doesn't >> >> achieve >> >>>> the >> >>>>>>>> full goal of completing encapsulating a state store and it's >> >>> processor >> >>>>>>>> >> >>>>>>>> - >> >>>>>>>> >> >>>>>>>> it >> >>>>>>>> >> >>>>>>>> encapsulates the name, but not the StateStoreBuilder. >> >>>>>>>> >> >>>>>>>> I'm really intrigued by Matthias's idea that forgoes the default >> >>>>>>>> >> >>>>>>>> interface >> >>>>>>>> >> >>>>>>>> method I proposed. Having smaller, separate interfaces is a >> >>> powerful >> >>>>>>>> >> >>>>>>>> idea >> >>>>>>>> >> >>>>>>>> and I think a cleaner API than what I proposed. The non-shared >> >>> store >> >>>>>>>> >> >>>>>>>> use >> >>>>>>>> >> >>>>>>>> case is handled well here, and the shared store use case is >> >>> possible, >> >>>>>>>> though maybe still not as graceful as we would like (having to >> add >> >>> the >> >>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too >> >>>>>>>> >> >>>>>>>> subtle >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> me). >> >>>>>>>> >> >>>>>>>> We're all agreed that one of the big problems with the shared >> >> store >> >>>> use >> >>>>>>>> case is how to deal with adding the same store to the topology >> >>>> multiple >> >>>>>>>> times. Catching the "store already added" exception is risky. >> >>> Here's >> >>>>>>>> >> >>>>>>>> a >> >>>>>>>> >> >>>>>>>> maybe radical idea: change `topology.addStateStore()` to be >> >>> idempotent >> >>>>>>>> >> >>>>>>>> for >> >>>>>>>> >> >>>>>>>> adding a given state store name and `StoreBuilder`. In other >> >> words, >> >>>>>>>> `addStateStore` would not throw the "store already added" >> >> exception >> >>> if >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> `StoreBuilder` being added for a given name has the same identity >> >> as >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> one that has already been added. Does this eliminate all the >> bugs >> >>>>>>>> >> >>>>>>>> we're >> >>>>>>>> >> >>>>>>>> worried about? Thinking about it for a few minutes, it seems to >> >>>>>>>> >> >>>>>>>> eliminate >> >>>>>>>> >> >>>>>>>> most at least (would a user really use the exact same >> StoreBuilder >> >>>> when >> >>>>>>>> they intend there to be two stores?). It might make the API >> >>> slightly >> >>>>>>>> harder to use if a user isn't immediately aware of that subtlety, >> >>> but >> >>>> a >> >>>>>>>> good error message should ease the pain, and it would happen >> >>>>>>>> >> >>>>>>>> immediately >> >>>>>>>> >> >>>>>>>> during development. >> >>>>>>>> >> >>>>>>>> And with regards to Matthias's comment about whether we need to >> >>>>>>>> >> >>>>>>>> deprecate >> >>>>>>>> >> >>>>>>>> existing varargs transform methods - I don't think we need to, >> but >> >>> it >> >>>>>>>> >> >>>>>>>> might >> >>>>>>>> >> >>>>>>>> be nice for there only to be one way to do things, assuming >> >> whatever >> >>>> we >> >>>>>>>> come up with supports all existing use cases. I don't feel >> >> strongly >> >>>>>>>> >> >>>>>>>> about >> >>>>>>>> >> >>>>>>>> this, but if we don't deprecate, I do think it's important to add >> >>>>>>>> >> >>>>>>>> checks >> >>>>>>>> >> >>>>>>>> that prevent users from trying to do the same thing in two >> >> different >> >>>>>>>> >> >>>>>>>> ways, >> >>>>>>>> >> >>>>>>>> as we've discussed. >> >>>>>>>> >> >>>>>>>> Paul >> >>>>>>>> >> >>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < >> >>>> matth...@confluent.io >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Guozhang, >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Regarding the last option to catch "store exist already" >> exception >> >>>>>>>> >> >>>>>>>> and >> >>>>>>>> >> >>>>>>>> fallback to connect stores, I'm a bit concerned it may be hiding >> >>>>>>>> >> >>>>>>>> actual >> >>>>>>>> >> >>>>>>>> user bugs. >> >>>>>>>> >> >>>>>>>> I agree with this concern. From my original email: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> The only disadvantage I see, might be >> >>>>>>>> potential bugs about sharing state if two different stores are >> >>>>>>>> >> >>>>>>>> named >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> same by mistake (this would not be detected). >> >>>>>>>> >> >>>>>>>> For your new proposal: I am not sure if it addresses Paul's >> >> original >> >>>>>>>> idea -- I hope Paul can clarify. From my understanding, the idea >> >> was >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> encapsulate a store and its processor. As many stores are not >> >>> shared, >> >>>>>>>> this seems to be quite useful. Your proposal falls a little short >> >> to >> >>>>>>>> support encapsulation for none-shared stores. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: >> >>>>>>>> >> >>>>>>>> Matthias, >> >>>>>>>> >> >>>>>>>> Thanks for your feedbacks. >> >>>>>>>> >> >>>>>>>> Regarding the last option to catch "store exist already" >> exception >> >>>>>>>> >> >>>>>>>> and >> >>>>>>>> >> >>>>>>>> fallback to connect stores, I'm a bit concerned it may be hiding >> >>>>>>>> >> >>>>>>>> actual >> >>>>>>>> >> >>>>>>>> user bugs. >> >>>>>>>> >> >>>>>>>> Thinking about Paul's proposal and your suggestion again, I'd >> like >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> propose another alternative somewhere in the middle of your >> >>>>>>>> >> >>>>>>>> approaches, >> >>>>>>>> >> >>>>>>>> i.e. we still let users to create sharable state stores via >> >>>>>>>> `addStateStore`, and we allow the TransformerSupplier to return a >> >>>>>>>> >> >>>>>>>> list >> >>>>>>>> >> >>>>>>>> of >> >>>>>>>> >> >>>>>>>> state stores that it needs, i.e.: >> >>>>>>>> >> >>>>>>>> public interface TransformerSupplier<K, V, R> { >> >>>>>>>> Transformer<K, V, R> get(); >> >>>>>>>> default List<String> stateStoreNames() { >> >>>>>>>> return Collections.emptyList(); >> >>>>>>>> < >> >> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >> >>>>>>>> >> >>>>>>>> ;> >> >>>>>>>> >> >>>>>>>> } >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> by doing this users can still "consolidate" the references of >> >> store >> >>>>>>>> >> >>>>>>>> names >> >>>>>>>> >> >>>>>>>> in a single place in the transform call, e.g.: >> >>>>>>>> >> >>>>>>>> public class MyTransformerSupplier<K, V, R> { >> >>>>>>>> private String storeName; >> >>>>>>>> >> >>>>>>>> public class MyTransformer<K, V, R> { >> >>>>>>>> >> >>>>>>>> .... >> >>>>>>>> >> >>>>>>>> init() { >> >>>>>>>> store = context.getStateStore(storeName); >> >>>>>>>> } >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> default List<String> stateStoreNames() { >> >>>>>>>> return Collections.singletonList(storeName); >> >>>>>>>> < >> >> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >> >>>>>>>> >> >>>>>>>> ;> >> >>>>>>>> >> >>>>>>>> } >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> Basically, we move the parameters from the caller of `transform` >> >> to >> >>>>>>>> >> >>>>>>>> inside >> >>>>>>>> >> >>>>>>>> the TransformSuppliers. DSL implementations would not change >> much, >> >>>>>>>> >> >>>>>>>> simply >> >>>>>>>> >> >>>>>>>> calling `connectStateStore` by getting the list of names from the >> >>>>>>>> >> >>>>>>>> provided >> >>>>>>>> >> >>>>>>>> function. >> >>>>>>>> >> >>>>>>>> Guozhang >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < >> >>>>>>>> >> >>>>>>>> matth...@confluent.io >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Just a meta comment: do we really need to deprecate existing >> >>>>>>>> `transform()` etc methods? >> >>>>>>>> >> >>>>>>>> The last argument is a vararg, and thus, just keeping the >> existing >> >>>>>>>> >> >>>>>>>> API >> >>>>>>>> >> >>>>>>>> for this part seems to work too, allowing to implement both >> >>>>>>>> >> >>>>>>>> patterns? >> >>>>>>>> >> >>>>>>>> Also, instead of adding a default method, we could also add a new >> >>>>>>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder> >> >>>>>>>> stateStores()` -- users could implement `TransformerSupplier` and >> >>>>>>>> `StoreBuilderSupplier` at once; and for this case, we require >> that >> >>>>>>>> >> >>>>>>>> users >> >>>>>>>> >> >>>>>>>> don't provide store name in `transform()`. >> >>>>>>>> >> >>>>>>>> Similar, we could add an interface `StoreNameSupplier` with >> method >> >>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a >> >>>>>>>> >> >>>>>>>> transformer >> >>>>>>>> >> >>>>>>>> to existing stores (to avoid the issue to add the same store >> >>>>>>>> >> >>>>>>>> multiple >> >>>>>>>> >> >>>>>>>> times). >> >>>>>>>> >> >>>>>>>> Hence, for shared stores, there would be one "main" transformer >> >>>>>>>> >> >>>>>>>> that >> >>>>>>>> >> >>>>>>>> implements `StoreBuilderSupplier` and that must be added first to >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> topology. The other transformers would implement >> >>>>>>>> >> >>>>>>>> `StoreNameSupplier` >> >>>>>>>> >> >>>>>>>> and >> >>>>>>>> >> >>>>>>>> just connect to those stores. >> >>>>>>>> >> >>>>>>>> Another possibility to avoid the issue of adding the same stores >> >>>>>>>> multiple times would be, that the DSL always calls >> >>>>>>>> >> >>>>>>>> `addStateStore()` >> >>>>>>>> >> >>>>>>>> but >> >>>>>>>> >> >>>>>>>> catches a potential "store exists already" exception and falls >> >>>>>>>> >> >>>>>>>> back >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we would >> >>>>>>>> >> >>>>>>>> not >> >>>>>>>> >> >>>>>>>> need >> >>>>>>>> >> >>>>>>>> the `StoreNameSupplier` interface and the order in which >> >>>>>>>> >> >>>>>>>> transformers >> >>>>>>>> >> >>>>>>>> are added would not matter either. The only disadvantage I see, >> >>>>>>>> >> >>>>>>>> might >> >>>>>>>> >> >>>>>>>> be >> >>>>>>>> >> >>>>>>>> potential bugs about sharing state if two different stores are >> >>>>>>>> >> >>>>>>>> named >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> same by mistake (this would not be detected). >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Just some ideas I wanted to share. What do you think? >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote: >> >>>>>>>> >> >>>>>>>> Ah yes of course, this was an oversight, I completely ignored the >> >>>>>>>> >> >>>>>>>> multiple >> >>>>>>>> >> >>>>>>>> processors sharing the same state store when writing up the KIP. >> >>>>>>>> >> >>>>>>>> Which >> >>>>>>>> >> >>>>>>>> is >> >>>>>>>> >> >>>>>>>> funny, because I've actually done this (different processors >> >>>>>>>> >> >>>>>>>> sharing >> >>>>>>>> >> >>>>>>>> state >> >>>>>>>> >> >>>>>>>> stores) a fair amount myself, and I've settled on a pattern >> >>>>>>>> >> >>>>>>>> where I >> >>>>>>>> >> >>>>>>>> group >> >>>>>>>> >> >>>>>>>> the Processors in an enclosing class, and that enclosing class >> >>>>>>>> >> >>>>>>>> handles >> >>>>>>>> >> >>>>>>>> as >> >>>>>>>> >> >>>>>>>> much as possible. Here's a gist showing the rough structure, >> >>>>>>>> >> >>>>>>>> just >> >>>>>>>> >> >>>>>>>> for >> >>>>>>>> >> >>>>>>>> context: >> >>>>>>>> >> >>>>>>>> >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 >> >>>>>>>> >> >>>>>>>> . Note how it adds the stores to the topology, as well as >> >>>>>>>> >> >>>>>>>> providing a >> >>>>>>>> >> >>>>>>>> public method with the store names. >> >>>>>>>> >> >>>>>>>> I don't think my proposal completely conflicts with the multiple >> >>>>>>>> >> >>>>>>>> processors >> >>>>>>>> >> >>>>>>>> sharing state stores use case, since you can create a supplier >> >>>>>>>> >> >>>>>>>> that >> >>>>>>>> >> >>>>>>>> provides the store name you want, somewhat independently of your >> >>>>>>>> >> >>>>>>>> actual >> >>>>>>>> >> >>>>>>>> Processor logic. The issue I do see though, is that >> >>>>>>>> topology.addStateStore() can only be called once for a given >> >>>>>>>> >> >>>>>>>> store. >> >>>>>>>> >> >>>>>>>> So >> >>>>>>>> >> >>>>>>>> for >> >>>>>>>> >> >>>>>>>> your example, if the there was a single TransformerSupplier that >> >>>>>>>> >> >>>>>>>> was >> >>>>>>>> >> >>>>>>>> passed >> >>>>>>>> >> >>>>>>>> into both transform() calls, "store1" would be added (under the >> >>>>>>>> >> >>>>>>>> hood) >> >>>>>>>> >> >>>>>>>> to >> >>>>>>>> >> >>>>>>>> the topology twice, which is no good. >> >>>>>>>> >> >>>>>>>> Perhaps this suggests that one of my alternatives on the KIP >> >>>>>>>> >> >>>>>>>> might >> >>>>>>>> >> >>>>>>>> be >> >>>>>>>> >> >>>>>>>> desirable: either not having the suppliers return StoreBuilders >> >>>>>>>> >> >>>>>>>> (just >> >>>>>>>> >> >>>>>>>> store >> >>>>>>>> >> >>>>>>>> names), or not deprecating the old methods that take "String... >> >>>>>>>> stateStoreNames". I'll have to think about it a bit. >> >>>>>>>> >> >>>>>>>> Paul >> >>>>>>>> >> >>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < >> >>>>>>>> >> >>>>>>>> wangg...@gmail.com> >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> Hello Paul, >> >>>>>>>> >> >>>>>>>> Thanks for the great writeup (very detailed and crystal >> >>>>>>>> >> >>>>>>>> motivation >> >>>>>>>> >> >>>>>>>> sections!). >> >>>>>>>> >> >>>>>>>> This is quite an interesting idea and I do like the API >> >>>>>>>> >> >>>>>>>> cleanness >> >>>>>>>> >> >>>>>>>> you >> >>>>>>>> >> >>>>>>>> proposed. The original motivation of letting StreamsTopology to >> >>>>>>>> >> >>>>>>>> add >> >>>>>>>> >> >>>>>>>> state >> >>>>>>>> >> >>>>>>>> stores though, is to allow different processors to share the >> >>>>>>>> >> >>>>>>>> state >> >>>>>>>> >> >>>>>>>> store. >> >>>>>>>> >> >>>>>>>> For example: >> >>>>>>>> >> >>>>>>>> builder.addStore("store1"); >> >>>>>>>> >> >>>>>>>> // a path of stream transformations that leads to KStream >> >>>>>>>> >> >>>>>>>> stream1. >> >>>>>>>> >> >>>>>>>> stream1.transform(..., "store1"); >> >>>>>>>> >> >>>>>>>> // another path that generates a KStream stream2. >> >>>>>>>> stream2.transform(..., "store1"); >> >>>>>>>> >> >>>>>>>> Behind the scene, Streams will make sure stream1 / stream2 >> >>>>>>>> >> >>>>>>>> transformations >> >>>>>>>> >> >>>>>>>> will always be grouped together as a single group of tasks, each >> >>>>>>>> >> >>>>>>>> of >> >>>>>>>> >> >>>>>>>> which >> >>>>>>>> >> >>>>>>>> will be executed by a single thread and hence there's no >> >>>>>>>> >> >>>>>>>> concurrency >> >>>>>>>> >> >>>>>>>> issues >> >>>>>>>> >> >>>>>>>> on accessing the store from different operators within the same >> >>>>>>>> >> >>>>>>>> task. >> >>>>>>>> >> >>>>>>>> I'm >> >>>>>>>> >> >>>>>>>> not sure how common this use case is, but I'd like to hear if >> >>>>>>>> >> >>>>>>>> you >> >>>>>>>> >> >>>>>>>> have >> >>>>>>>> >> >>>>>>>> any >> >>>>>>>> >> >>>>>>>> thoughts maintaining this since the current proposal seems >> >>>>>>>> >> >>>>>>>> exclude >> >>>>>>>> >> >>>>>>>> this >> >>>>>>>> >> >>>>>>>> possibility. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Guozhang >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> >> < >> >>>>>> pgwha...@gmail.com> >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change >> >>>>>>>> >> >>>>>>>> that >> >>>>>>>> >> >>>>>>>> I >> >>>>>>>> >> >>>>>>>> think could greatly increase the usability of the low-level >> >>>>>>>> >> >>>>>>>> processor >> >>>>>>>> >> >>>>>>>> API. >> >>>>>>>> >> >>>>>>>> I have some code written but will wait to see if there is buy >> >>>>>>>> >> >>>>>>>> in >> >>>>>>>> >> >>>>>>>> before >> >>>>>>>> >> >>>>>>>> going all out and creating a pull request. It seems like most >> >>>>>>>> >> >>>>>>>> of >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>> >> >>>>>>>> work >> >>>>>>>> >> >>>>>>>> would be in updating documentation and tests. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >> >>>>>>>> >> >>>>>>>> Thanks! >> >>>>>>>> Paul >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -- >> >>>>>>>> -- Guozhang >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -- >> >>>>>>>> -- Guozhang >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>>> >> >>> >> >> >> > >> >>