Thank Paul! I agree with all of that. If we think that the general design is good, refactoring a PR if we want to pick a different name should not be too much additional work (hopefully). Thus, if you want to open a WIP PR and we use it to nail the open details, it might help to find a good conclusion.
>> 2) Default method vs new interface: This seems to be the hardest tradeoff. I see the point about discoveability... Might be good to get input from others, which version they would prefer. Just to make clear, my suggestion from the last email was, that `Transformer` etc does not extend the new interface. Instead, a user that want to use this feature would need to implement both interfaces. If `Transformer extends StoreProvider` (just picking a name here) without default implementation existing code would break and thus it not a an option because of breaking backward compatibility. -Matthias On 4/28/19 8:37 PM, Paul Whalen wrote: > Great thoughts Matthias, thanks! I think we're all agreed that naming and > documentation/education are the biggest hurdles for this KIP, and in light > of that, I think it makes sense for me to just take a stab at a full > fledged PR with documentation to convince us that it's possible to do it > with enough clarity. > > In response to your specific thoughts: > > 1) StateStoreConnector as a name: Really good point about defining the > difference between "adding" and "connecting." Guozhang suggested > StateStoreConnector which was definitely an improvement over my > StateStoresSupplier, but I think you're right that we need to be careful to > make it clear that it's really accomplishing both. Thinking about it now, > one problem with Connector is that the implementer of the interface is not > really doing any connecting, it's providing/supplying the store that will > be both added and connected. StoreProvider seems reasonable to me and > probably the best candidate at the moment, but it would be nice if the name > could convey that it's providing the store specifically so the caller can > add it to the topology and connect it to the associated transformer. > > In general I think that really calling out what "adding" versus > "connecting" is in the documentation will help make the entire purpose of > this feature more clear to the user. > > 2) Default method vs new interface: The choice of a default method was > influenced by Guozhang's fear about API bloat/discoverability. I can > definitely see it both ways Would the separate interface be a > sub-interface of Processor/TransformerSupplier or standalone? It seems > like you're suggesting standalone and I think that's what I favor. My only > concern there is that the interface wouldn't actually be a type to any > public API which sort of hurts discoverability. You would have to read the > javadocs for stream.process/transform() to discover that implementing the > interface in addition to Processor/TransformerSupplier would add and > connect the store for you. But that added burden actually probably helps > us in terms of making sure people don't mix and match, like you said. > > 3) Returning null instead of empty: Seems fair to me. I always worry about > returning null when an empty collection can be used instead, but given that > the library is the caller rather than the client I think your point makes > sense here. > > 4) Returning Set instead of Collection: Agreed, don't see why not to make > it more specific. > > Paul > > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi, sorry for the long pause. Just trying to catch up here. >> >> I think it save to allow `addStateStore()` to be idempotent for the same >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's >> not really possible to use the same `StoreBuilder` object to create >> different stores. >> >> I also agree with the concern, that only allowing a single store (as >> proposed by Ivan) might be too restrictive. >> >> Overall, the current KIP version LGTM. I don't have mayor concerns about >> user education for this case, but I agree that we need to document this >> clearly. >> >> Some further comments: >> >> (1) I am not sure if `StateStoreConnector` is the best name for the new >> interface. Note, that there are two concepts about stores: >> >> - adding a store: this makes the store available in the topology in >> general (however, the store is still "dangling", and not used) >> - connecting a store: this allows a processor etc to use a store >> >> The new interface does both, but its name only indicates that second >> part what might be confusing. It might be especially confusing because >> we want to disallow to mix the exiting "manually add and connect" >> pattern, with a new pattern to "auto add+connect". If the new interface >> name indicates the connect part only, user might think they need to add >> stores manually and can connect automatically. >> >> Unfortunately, I don't have a much better suggestion for a name either. >> The only idea that came to my mind was `StoreProvider`: to me, a >> provider is a "service" interface that does work for us, ie, it adds and >> connects a store. Not sure if this is too subtle, if we consider that >> there is already the `StoreSupplier` interface? >> >> But maybe somebody else might still have a good idea on how the improve >> the name. >> >> In any case, I would suggest to shorten the name to `StoreConnector` >> instead of `StateStoreConnector`, because we also have `StoreSupplier` >> and `StoreBuilder`. >> >> >> >> (2) The KIP proposes to add the new interface to `ProcessorSupplier` etc >> and to add a default implementation for the new method. Hence, user >> would need to overwrite this default implementation to op-in to the >> feature. I am wonder if it might be better to not add the new interface >> to `ProcessorSupplier` etc and to just provide a new interface with no >> default implementation. Users would opt-in by adding the interface >> explicitly to their existing `ProcessorSupplier` implementation. >> Overwriting a default method and getting different behavior seems to be >> a little subtle to me, especially, because we don't want to allow to >> mix-and-match the old and new approaches. Think: I only overwrite a >> default method and my code breaks. >> >> Thoughts? >> >> >> >> (3) If we keep the current default implementation for the new method, I >> am wondering if it should return `null` instead of an empty collection? >> This might be saver to detect bugs in user code for which, per accident, >> an empty collection could be returned. >> >> >> >> (4) Should the new method return a `Set` instead of a `Collection` to >> indicate the semantics clearly (ie, returning the same `StoreBuilder` >> multiple times is idempotent and one cannot add+connect to it twice). >> >> >> >> -Matthias >> >> >> >> >> On 4/6/19 12:27 PM, Paul Whalen wrote: >>> Ivan and Guozhang, >>> >>> Thanks for the thoughts! Ivan's use case is definitely interesting. The >>> way I see it, if we can achieve the main goal of the KIP (allowing >>> Processor/TransformerSuppliers to encapsulate their usage of state >> stores), >>> we will enable this kind of thing in "user space" very easily. >>> >>> I will say that I'm not totally sure that most use cases of transform() >> use >>> just one state store. It's hard to know since I haven't seen many >> examples >>> in public, but my team's usages almost exclusively require multiple state >>> stores. We only reach for the low level processor API when we need that >>> complexity, and it's somewhat hard to imagine many use cases that only >> need >>> one state store, since the high level DSL can usually accomplish those >>> tasks. The example Ivan presented for instance looks like a >>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious what sort of >>> other usages you're imagining. >>> >>> That being said, perhaps the Processor API should really just be >> considered >>> a separate paradigm in Streams, not just a lower level that we reach to >>> when necessary. In which case it would be beneficial to make the simple >>> use cases easier. I've definitely talked about this with my own team - >> if >>> you're less familiar with the kind of functional style that the high >> level >>> DSL offers, it might be easier to "see" your state and interact with it >>> directly. >>> >>> Anyway, I've updated the KIP to reflect my current PR with Guozhang's >>> suggestions. It seems like there is at least some interest in that on >> its >>> own and not a ton of pushback, so I think I will try to start a vote. >>> >>> Paul >>> >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <iponoma...@mail.ru> >> wrote: >>> >>>> Hi all! >>>> >>>> I was about to write another KIP, but found out that KIP-401 addresses >>>> exactly the problem I faced. So let me jump into your discussion and ask >>>> you to assess another idea. >>>> >>>> I fully agree with the KIP-401's motivation part. E. g in my project I >> had >>>> to invent a wrapper class that hides the details of KeyValueStore >>>> management from business logic. Of course this should be done better in >>>> KStreams API. >>>> >>>> But I was about to look at this problem from another side and propose a >>>> simple alternative in high-level DSL, that will not fit all the cases, >> but >>>> most of them. Hence my idea does not exclude the Paul's proposal. >>>> >>>> What if we restrict ourselves to *only one* KeyValueStore and propose a >>>> method that resembles `aggregate` and `reduce` methods, like this: >>>> >>>> stream >>>> .map(...) >>>> .filter(...) >>>> .transform ((k, v, s)->{....}, Transformed.with(....)) >>>> >>>> where >>>> * k, v -- input key & value >>>> * s -- a KeyValueStore provided as an argument >>>> * return value of the lambda should be KeyValue.pair(...) >>>> * Transformed.with... is a builder, used in order to define the >>>> Transformer and KeyValueStore building parameters. Some of these >> parameters >>>> should be: >>>> ** store's KeySerde, >>>> ** store's ValueSerde, >>>> ** whether the store is persistent or in-memory, >>>> ** store's name -- optional parameter, the system should be able to >> devise >>>> the name of the store transparently for the user, if we don't want to >>>> devise it ourselves/share the store between processors. >>>> ** scheduled punctuation. >>>> >>>> Imagine we have a KStream<String, Integer>, and we need to calculate a >>>> `derivative` stream, that is, a stream of 'deltas' of the provided >> integer >>>> values. >>>> >>>> This could be achieved as simple as >>>> >>>> stream.transform((key, value, stateStore) -> { >>>> int previousValue = >>>> Optional.ofNullable(stateStore.get(key)).orElse(0); >>>> stateStore.put(key, value); >>>> return KeyValue.pair(key, value - previousValue); >>>> } >>>> //we do not need to bother with store name, punctuation etc. >>>> //may be even Serde part can be omitted, since we can inherit >> the >>>> serdes from stream by default >>>> , Transformed.with(Serdes.String(), Serdes.Integer()) >>>> } >>>> >>>> The hard part of it is that new `transform` method definition should be >>>> parameterized by six type parameters: >>>> >>>> * input/output/KeyValueStore key type, >>>> * input/output/KeyValueStore value type. >>>> >>>> However, it seems that all these types can be inferred from the provided >>>> lambda and Transformed.with instances. >>>> >>>> What do you think about this? >>>> >>>> Regards, >>>> >>>> Ivan >>>> >>>> >>>> 27.03.2019 20:45, Guozhang Wang пишет: >>>> >>>> Hello Paul, >>>> >>>> Thanks for the uploaded PR and the detailed description! I've made a >> pass >>>> on it and left some comments. >>>> >>>> Overall I think I agree with you that passing in the storebuilder >> directly >>>> that store name is more convienent as it does not require another >>>> `addStore` call, but we just need to spend some more documentation >> effort >>>> on educating users about the two ways of connecting their stores. I'm >>>> slightly concerned about this education curve but I can be convinced if >>>> most people felt it is worthy. >>>> >>>> >>>> Guozhang >>>> >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> < >> pgwha...@gmail.com> wrote: >>>> >>>> >>>> I'd like to resurrect this discussion with a cursory, proof-of-concept >>>> implementation of the KIP which combines many of our ideas: >> https://github.com/apache/kafka/pull/6496. I tried to keep the diff as >>>> small as possible for now, just using it to convey the main ideas. But >>>> I'll separately address some of our earlier discussion: >>>> >>>> - Will there be a new, separate interface for users to implement for >> the >>>> new functionality? No, to hopefully keep things simple, all of the >>>> Processor/TransformerSupplier interfaces will just extend >>>> StateStoresSupplier, allowing users to opt in to this functionality >> by >>>> overriding the default implementation that gives an empty list. >>>> - Will the interface allow users to specify the store name, or the >>>> entire StoreBuilder? The entire StoreBuilder, so the >>>> Processor/TransformerSupplier can completely encapsulate name and >>>> implementation of a state store if desired. >>>> - Will the old way of specifying store names alongside the supplier >> when >>>> calling stream.process/transform() be deprecated? No, this is still a >>>> legitimate way to wire up Processors/Transformers and their stores. >> But >>>> I >>>> would recommend not allowing stream.process/transform() calls that >> use >>>> both >>>> store declaration mechanisms (this restriction is not in the proof of >>>> concept) >>>> - How will we handle adding the same state store to the topology >>>> multiple times because different Processor/TransformerSuppliers >> declare >>>> it? >>>> topology.addStateStore() will be slightly relaxed for convenience, >> and >>>> will >>>> allow adding the same StoreBuilder multiple times as long as the >> exact >>>> same >>>> StoreBuilder instance is being added for the same store name. This >>>> seems >>>> to prevent in practice the issue of accidentally making two state >> stores >>>> one by adding with the same name. For additional safety, if we >> wanted >>>> to >>>> (not in the proof of concept), we could allow for this relaxation >> only >>>> for >>>> internal callers of topology.addStateStore(). >>>> >>>> So, in summary, the use cases look like: >>>> >>>> - 1 transformer/processor that owns its store: Using the new >>>> StateStoresSupplier interface method to supply its StoreBuilders that >>>> will >>>> be added to the topology automatically. >>>> - Multiple transformer/processors that share the same store: Either >>>> >>>> >>>> 1. The old way: the StoreBuilder is defined "far away" from the >>>> Transformer/Processor implementations, and is added to the topology >>>> manually by the user >>>> 2. The new way: the StoreBuilder is defined closer to the >>>> Transformer/Processor implementations, and the same instance is >>>> returned by >>>> all Transformer/ProcessorSuppliers that need it >>>> >>>> >>>> This makes the KIP wiki a bit stale; I'll update if we want to bring >> this >>>> design to a vote. >>>> >>>> Thanks! >>>> Paul >>>> >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> < >> wangg...@gmail.com> wrote: >>>> >>>> >>>> Matthias / Paul, >>>> >>>> The concern I had about introducing `StoreBuilderSupplier` is simply >>>> because it is another XXSupplier to the public API, so I'd like to ask >> if >>>> we really have to add it :) >>>> >>>> The difference between encapsulating the store name and encapsulating >> the >>>> full state store builder is that, in the former: >>>> >>>> ----------- >>>> >>>> String storeName = "store1"; >>>> builder.addStore(new MyStoreBuilder(storeName)); >>>> stream1.transform(new MyTransformerSupplier(storeName)); // following >>>> >>>> my >>>> >>>> proposal, that the store name can be passed in and used for both >>>> `listStores` and in the `Transformer#init`; so the Transformer function >>>> does not need to get the constant string name again. >>>> >>>> // one caveat to admit, is that >>>> MyTransofmerSupplier logic may be just unique to `store1` so it cannot >> be >>>> reused with a different store name anyways. >>>> ----------- >>>> >>>> While in the latter: >>>> >>>> ----------- >>>> >>>> stream1.transform(new MyTransformerSupplierForStore1); // the name is >>>> just indicating that we may have one such supplier for each store. >>>> >>>> ----------- >>>> >>>> I understand the latter introduce more convenience from the API, but the >>>> cost is that since we still cannot completely `builder.addStore`, but >>>> >>>> only >>>> >>>> reduce its semantic scope to shared state stores only,; hence users need >>>> >>>> to >>>> >>>> learn two ways of creating state stores for those two patterns. >>>> >>>> My argument is that more public APIs requires longer learning curve for >>>> users, and introduces more usage patterns that may confuse users (the >>>> proposal I had tries to replace one with another completely). >>>> >>>> >>>> Guozhang >>>> >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> < >> pgwha...@gmail.com> wrote: >>>> >>>> >>>> Thanks for the great thoughts Matthias and Guozhang! >>>> >>>> If I'm not mistaken, Guozhang's suggestion is what my second >>>> >>>> alternative >>>> >>>> on >>>> >>>> the KIP is ("Have the added method on the Supplier interfaces only >>>> >>>> return >>>> >>>> store names, not builders"). I do think it would be a worthwhile >>>> >>>> usability >>>> >>>> improvement on its own, but to Matthias's point, it doesn't achieve the >>>> full goal of completing encapsulating a state store and it's processor >>>> >>>> - >>>> >>>> it >>>> >>>> encapsulates the name, but not the StateStoreBuilder. >>>> >>>> I'm really intrigued by Matthias's idea that forgoes the default >>>> >>>> interface >>>> >>>> method I proposed. Having smaller, separate interfaces is a powerful >>>> >>>> idea >>>> >>>> and I think a cleaner API than what I proposed. The non-shared store >>>> >>>> use >>>> >>>> case is handled well here, and the shared store use case is possible, >>>> though maybe still not as graceful as we would like (having to add the >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too >>>> >>>> subtle >>>> >>>> to >>>> >>>> me). >>>> >>>> We're all agreed that one of the big problems with the shared store use >>>> case is how to deal with adding the same store to the topology multiple >>>> times. Catching the "store already added" exception is risky. Here's >>>> >>>> a >>>> >>>> maybe radical idea: change `topology.addStateStore()` to be idempotent >>>> >>>> for >>>> >>>> adding a given state store name and `StoreBuilder`. In other words, >>>> `addStateStore` would not throw the "store already added" exception if >>>> >>>> the >>>> >>>> `StoreBuilder` being added for a given name has the same identity as >>>> >>>> the >>>> >>>> one that has already been added. Does this eliminate all the bugs >>>> >>>> we're >>>> >>>> worried about? Thinking about it for a few minutes, it seems to >>>> >>>> eliminate >>>> >>>> most at least (would a user really use the exact same StoreBuilder when >>>> they intend there to be two stores?). It might make the API slightly >>>> harder to use if a user isn't immediately aware of that subtlety, but a >>>> good error message should ease the pain, and it would happen >>>> >>>> immediately >>>> >>>> during development. >>>> >>>> And with regards to Matthias's comment about whether we need to >>>> >>>> deprecate >>>> >>>> existing varargs transform methods - I don't think we need to, but it >>>> >>>> might >>>> >>>> be nice for there only to be one way to do things, assuming whatever we >>>> come up with supports all existing use cases. I don't feel strongly >>>> >>>> about >>>> >>>> this, but if we don't deprecate, I do think it's important to add >>>> >>>> checks >>>> >>>> that prevent users from trying to do the same thing in two different >>>> >>>> ways, >>>> >>>> as we've discussed. >>>> >>>> Paul >>>> >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io >>>> >>>> wrote: >>>> >>>> >>>> Guozhang, >>>> >>>> >>>> Regarding the last option to catch "store exist already" exception >>>> >>>> and >>>> >>>> fallback to connect stores, I'm a bit concerned it may be hiding >>>> >>>> actual >>>> >>>> user bugs. >>>> >>>> I agree with this concern. From my original email: >>>> >>>> >>>> The only disadvantage I see, might be >>>> potential bugs about sharing state if two different stores are >>>> >>>> named >>>> >>>> the >>>> >>>> same by mistake (this would not be detected). >>>> >>>> For your new proposal: I am not sure if it addresses Paul's original >>>> idea -- I hope Paul can clarify. From my understanding, the idea was >>>> >>>> to >>>> >>>> encapsulate a store and its processor. As many stores are not shared, >>>> this seems to be quite useful. Your proposal falls a little short to >>>> support encapsulation for none-shared stores. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: >>>> >>>> Matthias, >>>> >>>> Thanks for your feedbacks. >>>> >>>> Regarding the last option to catch "store exist already" exception >>>> >>>> and >>>> >>>> fallback to connect stores, I'm a bit concerned it may be hiding >>>> >>>> actual >>>> >>>> user bugs. >>>> >>>> Thinking about Paul's proposal and your suggestion again, I'd like >>>> >>>> to >>>> >>>> propose another alternative somewhere in the middle of your >>>> >>>> approaches, >>>> >>>> i.e. we still let users to create sharable state stores via >>>> `addStateStore`, and we allow the TransformerSupplier to return a >>>> >>>> list >>>> >>>> of >>>> >>>> state stores that it needs, i.e.: >>>> >>>> public interface TransformerSupplier<K, V, R> { >>>> Transformer<K, V, R> get(); >>>> default List<String> stateStoreNames() { >>>> return Collections.emptyList(); >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>> >>>> ;> >>>> >>>> } >>>> } >>>> >>>> by doing this users can still "consolidate" the references of store >>>> >>>> names >>>> >>>> in a single place in the transform call, e.g.: >>>> >>>> public class MyTransformerSupplier<K, V, R> { >>>> private String storeName; >>>> >>>> public class MyTransformer<K, V, R> { >>>> >>>> .... >>>> >>>> init() { >>>> store = context.getStateStore(storeName); >>>> } >>>> } >>>> >>>> default List<String> stateStoreNames() { >>>> return Collections.singletonList(storeName); >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>> >>>> ;> >>>> >>>> } >>>> } >>>> >>>> Basically, we move the parameters from the caller of `transform` to >>>> >>>> inside >>>> >>>> the TransformSuppliers. DSL implementations would not change much, >>>> >>>> simply >>>> >>>> calling `connectStateStore` by getting the list of names from the >>>> >>>> provided >>>> >>>> function. >>>> >>>> Guozhang >>>> >>>> >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < >>>> >>>> matth...@confluent.io >>>> >>>> wrote: >>>> >>>> >>>> Just a meta comment: do we really need to deprecate existing >>>> `transform()` etc methods? >>>> >>>> The last argument is a vararg, and thus, just keeping the existing >>>> >>>> API >>>> >>>> for this part seems to work too, allowing to implement both >>>> >>>> patterns? >>>> >>>> Also, instead of adding a default method, we could also add a new >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder> >>>> stateStores()` -- users could implement `TransformerSupplier` and >>>> `StoreBuilderSupplier` at once; and for this case, we require that >>>> >>>> users >>>> >>>> don't provide store name in `transform()`. >>>> >>>> Similar, we could add an interface `StoreNameSupplier` with method >>>> `List<String> stateStores()`. This allows to "auto-wire" a >>>> >>>> transformer >>>> >>>> to existing stores (to avoid the issue to add the same store >>>> >>>> multiple >>>> >>>> times). >>>> >>>> Hence, for shared stores, there would be one "main" transformer >>>> >>>> that >>>> >>>> implements `StoreBuilderSupplier` and that must be added first to >>>> >>>> the >>>> >>>> topology. The other transformers would implement >>>> >>>> `StoreNameSupplier` >>>> >>>> and >>>> >>>> just connect to those stores. >>>> >>>> Another possibility to avoid the issue of adding the same stores >>>> multiple times would be, that the DSL always calls >>>> >>>> `addStateStore()` >>>> >>>> but >>>> >>>> catches a potential "store exists already" exception and falls >>>> >>>> back >>>> >>>> to >>>> >>>> `connectProcessorAndStateStore()` for this case. Thus, we would >>>> >>>> not >>>> >>>> need >>>> >>>> the `StoreNameSupplier` interface and the order in which >>>> >>>> transformers >>>> >>>> are added would not matter either. The only disadvantage I see, >>>> >>>> might >>>> >>>> be >>>> >>>> potential bugs about sharing state if two different stores are >>>> >>>> named >>>> >>>> the >>>> >>>> same by mistake (this would not be detected). >>>> >>>> >>>> >>>> Just some ideas I wanted to share. What do you think? >>>> >>>> >>>> >>>> -Matthias >>>> >>>> On 12/11/18 3:46 AM, Paul Whalen wrote: >>>> >>>> Ah yes of course, this was an oversight, I completely ignored the >>>> >>>> multiple >>>> >>>> processors sharing the same state store when writing up the KIP. >>>> >>>> Which >>>> >>>> is >>>> >>>> funny, because I've actually done this (different processors >>>> >>>> sharing >>>> >>>> state >>>> >>>> stores) a fair amount myself, and I've settled on a pattern >>>> >>>> where I >>>> >>>> group >>>> >>>> the Processors in an enclosing class, and that enclosing class >>>> >>>> handles >>>> >>>> as >>>> >>>> much as possible. Here's a gist showing the rough structure, >>>> >>>> just >>>> >>>> for >>>> >>>> context: >>>> >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 >>>> >>>> . Note how it adds the stores to the topology, as well as >>>> >>>> providing a >>>> >>>> public method with the store names. >>>> >>>> I don't think my proposal completely conflicts with the multiple >>>> >>>> processors >>>> >>>> sharing state stores use case, since you can create a supplier >>>> >>>> that >>>> >>>> provides the store name you want, somewhat independently of your >>>> >>>> actual >>>> >>>> Processor logic. The issue I do see though, is that >>>> topology.addStateStore() can only be called once for a given >>>> >>>> store. >>>> >>>> So >>>> >>>> for >>>> >>>> your example, if the there was a single TransformerSupplier that >>>> >>>> was >>>> >>>> passed >>>> >>>> into both transform() calls, "store1" would be added (under the >>>> >>>> hood) >>>> >>>> to >>>> >>>> the topology twice, which is no good. >>>> >>>> Perhaps this suggests that one of my alternatives on the KIP >>>> >>>> might >>>> >>>> be >>>> >>>> desirable: either not having the suppliers return StoreBuilders >>>> >>>> (just >>>> >>>> store >>>> >>>> names), or not deprecating the old methods that take "String... >>>> stateStoreNames". I'll have to think about it a bit. >>>> >>>> Paul >>>> >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < >>>> >>>> wangg...@gmail.com> >>>> >>>> wrote: >>>> >>>> Hello Paul, >>>> >>>> Thanks for the great writeup (very detailed and crystal >>>> >>>> motivation >>>> >>>> sections!). >>>> >>>> This is quite an interesting idea and I do like the API >>>> >>>> cleanness >>>> >>>> you >>>> >>>> proposed. The original motivation of letting StreamsTopology to >>>> >>>> add >>>> >>>> state >>>> >>>> stores though, is to allow different processors to share the >>>> >>>> state >>>> >>>> store. >>>> >>>> For example: >>>> >>>> builder.addStore("store1"); >>>> >>>> // a path of stream transformations that leads to KStream >>>> >>>> stream1. >>>> >>>> stream1.transform(..., "store1"); >>>> >>>> // another path that generates a KStream stream2. >>>> stream2.transform(..., "store1"); >>>> >>>> Behind the scene, Streams will make sure stream1 / stream2 >>>> >>>> transformations >>>> >>>> will always be grouped together as a single group of tasks, each >>>> >>>> of >>>> >>>> which >>>> >>>> will be executed by a single thread and hence there's no >>>> >>>> concurrency >>>> >>>> issues >>>> >>>> on accessing the store from different operators within the same >>>> >>>> task. >>>> >>>> I'm >>>> >>>> not sure how common this use case is, but I'd like to hear if >>>> >>>> you >>>> >>>> have >>>> >>>> any >>>> >>>> thoughts maintaining this since the current proposal seems >>>> >>>> exclude >>>> >>>> this >>>> >>>> possibility. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> < >> pgwha...@gmail.com> >>>> >>>> wrote: >>>> >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change >>>> >>>> that >>>> >>>> I >>>> >>>> think could greatly increase the usability of the low-level >>>> >>>> processor >>>> >>>> API. >>>> >>>> I have some code written but will wait to see if there is buy >>>> >>>> in >>>> >>>> before >>>> >>>> going all out and creating a pull request. It seems like most >>>> >>>> of >>>> >>>> the >>>> >>>> work >>>> >>>> would be in updating documentation and tests. >>>> >>>> >>>> >>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >>>> >>>> Thanks! >>>> Paul >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature