It wasn't much of a lift changing option B to work for option C, so I
closed that PR and made a new one, which should be identical to the KIP
right now: https://github.com/apache/kafka/pull/6824.  There are a few
todos still which I will hold off until the KIP is accepted.

I created a voting thread about a month ago, so I'll bump that now that
we're nearly there.

Paul

On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com> wrote:

> Per Matthias's suggestion from a while ago, I actually implemented a good
> amount of option B to get a sense of the user experience and documentation
> requirements.  For a few reasons mentioned below, I think it's not my
> favorite option, and I prefer option C.  But since I did the work and it
> can help discussion, I may as well share:
> https://github.com/apache/kafka/pull/6821.
>
> Things I learned along the way implementing Option B:
>  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
> perfect but it seems to capture the general gist without being overly
> verbose.  I get that from a strict standpoint it's not "providing connected
> stores" but is instead "providing stores to be connected," but I think that
> in context and with documentation, the risk of someone being confused by
> that is low.
>  - I definitely felt the discoverability issue while trying to write clear
> documentation; you really have to make sure to connect the dots for the
> user when the interface isn't connected to anything.
>  - Another problem with a separate interface found while writing
> tests/examples: defining a ProcessorSupplier that also implements
> ConnectedStoreProvider cannot be done anonymously, since you can't define
> an anonymous class in Java that implements multiple interfaces.  I actually
> consider this a fairly major usability issue - it means a user always has
> to have a custom class rather than doing it inline.  We could provide an
> abstract class that implements the two, but at that point, we're not that
> far from option A or C anyway.
>
> I updated the KIP with my current thinking, which as mentioned is
> Matthias's option C.  Once again for clarity, that *is not* what is in the
> linked pull request.  The current KIP is my proposal.
>
> Thanks everyone for the input!
>
> P.S.  What do folks use to edit the HTML documentation, e.g.
> processor-api.html?  I looked at doing it by hand it but it kind of looked
> like agony with all the small tags required for formatting code, so I'm
> sort of assuming there's tooling for it.
>
> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> I think the discussion mixed approaches a little bit, hence, let me
>> rephrase my understanding:
>>
>>
>> A) add new method with default implementation to `ProcessorSupplier`:
>>
>> For this case, we don't add a new interface, but only add a new method
>> to `ProcessorSupplier` -- to keep backward compatibility, we need to add
>> a default implementation. Users opt into the new feature by overwriting
>> the default implementation.
>>
>>
>> B) We add a new interface with new method:
>>
>> For this case, `ProcessorSupplier` interface is not changed and it does
>> also _not_ extend the new interface. Because `ProcessorSupplier` is not
>> changed, it's naturally backward compatible. Users opt into the new
>> feature, by adding the new interface to their ProcessorSupplier
>> implementation and they need to implement the new method because there
>> is no default implementation. Kafka Streams can use `instanceof` to
>> detect if the new interface is used or not and thus, to the right thing.
>>
>>
>> What was also discussed is a mix of both:
>>
>> C) We add a new interface with new method and let `ProcessorSupplier`
>> extend the new interface:
>>
>> Here, we need to add a default implementation to preserve backward
>> compatibility. Similar to (A), users opt into the feature by overwriting
>> the default implementation.
>>
>>
>>
>> Option (C) is the same as (A) from a user point of view because a user
>> won't care about the new interface. It only makes a difference for our
>> code base, as we can share the default implementation of the new method
>> This is only a small gain, as the implementation is trivial but also a
>> small drawback as we add new public interface that is useless to the
>> user because the user would never implement the interface directly.
>>
>>
>>
>> For (A/C), it might be simpler for users to detect the feature. For (B),
>> we have the advantage that users must implement the method if they use
>> the new interface.
>>
>> Overall, it seems that (A) might be the best choice because it makes the
>> feature easier discoverable and does not add a "useless" interface. If
>> you want to go with (C) to share the default implementation code, that's
>> also fine with me. I am convinced now (even if I brought it up), that
>> (B) might be not optimal because feature discoverability seems to be
>> important.
>>
>>
>>
>>
>> About `null` vs `emptyList`: I still tend to like `null` better but it's
>> really a detail and not too important. Note, that the question only
>> arises for (A/C), but not for (B) because for (B) we don't need a
>> default implementation.
>>
>>
>>
>>
>> @Paul: It's unclear to me atm what your final proposal is because you
>> mentioned that you might want to rename `StateStoreConnector`? It's also
>> unclear to me atm, if you prefer (A), (B), or (C).
>>
>> Maybe you can update the KIP if necessary and clearly state what you
>> final proposal is. Beside this, it seems we can move to a VOTE?
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>>
>> On 5/2/19 3:01 PM, Bruno Cadonna wrote:
>> > Hi Paul,
>> >
>> > I will try to express myself a bit clearer.
>> >
>> > Ad 1)
>> > My assumption is that if `StateStoreConnector#stateStores()` returns
>> `null`
>> > Kafka Streams will throw an NPE because on purpose no null check is
>> > performed before the loop that calls `StreamsBuilder#addStateStore()`.
>> When
>> > the user finally understands the cause of the NPE, she knows that she
>> has
>> > to override `StateStoreConnector#stateStores()` in her implementation.
>> My
>> > question was, why let the user discover that she has to overwrite the
>> > method at runtime if you could not provide a default implementation for
>> > `StateStoreConnector#stateStores()` and let the compiler tell the user
>> the
>> > need to overwrite the method. Not providing a default implementation
>> > without separating the interfaces implies not being backward-compatible.
>> > That means, if we choose to not provide a default implementation and let
>> > the compiler signal the necessity to override the method, we have to
>> > separate the interfaces in any case.
>> >
>> > Ad 2)
>> > If you check for `null` or empty list in `process` and do not call
>> > `addStateStores` in those cases, the advantage of returning `null` to be
>> > saver to detect bugs as mentioned by Matthias would be lost. But maybe
>> I am
>> > missing something here.
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> >
>> > On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> wrote:
>> >
>> >> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
>> >>
>> >> 1) I'm not totally sure I'm clear on your point, but I think we're on
>> the
>> >> same page - if we're adding a method to the XSupplier interfaces (by
>> making
>> >> them inherit from a super interface StateStoreConnector) then we
>> definitely
>> >> need a default implementation to maintain compatibility.  Whether the
>> >> default implementation returns null or an empty list is somewhat of a
>> >> detail.
>> >>
>> >> 2) If stream.process() sees that StateStoreConnector#stateStores()
>> returns
>> >> either null or an empty list, it would handle that case specifically
>> and
>> >> not try to call addStateStore at all.  Or is this not what you're
>> asking?
>> >>
>> >> Separately, I'm still hacking away at the details of the PR and will
>> >> continue to get something into a discussable state, but I'll share some
>> >> thoughts I've run into.
>> >>
>> >> A) I'm tentatively going the separate interface route (Matthias's
>> >> suggestion) and naming it ConnectedStoreProvider.  Still don't love the
>> >> name, but there's something nice about the name indicating *why* this
>> thing
>> >> is providing the store, not just that it is providing it.
>> >>
>> >> B) It has occurred to me that topology.addProcessor() could also
>> recognize
>> >> if ProcessorSupplier implements ConnectedStoreProvider and add and
>> connect
>> >> stores appropriately.  This isn't in the KIP and I think the value-add
>> is
>> >> lower (if you're reaching that low level, surely the "auto add/connect
>> >> store" isn't too important to you), but I think it would be a
>> confusing if
>> >> it didn't, and I don't see any real downside.
>> >>
>> >> Paul
>> >>
>> >> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> @Paul: Thank you for the KIP!
>> >>>
>> >>> I hope you do not mind that I jump in.
>> >>>
>> >>> I have the following comments:
>> >>>
>> >>> 1) `null` vs empty list in the default implementation
>> >>> IIUC, returning `null` in the default implementation should basically
>> >>> signal that the method `stateStores` was not overridden. Why then
>> >> provide a
>> >>> default implementation in the first place? Without default
>> implementation
>> >>> you would discover the missing implementation already at compile-time
>> and
>> >>> not only at runtime. If you decide not to provide a default
>> >> implementation,
>> >>> `XSupplier extends StateStoreConnector` would break existing code as
>> >>> Matthias has already pointed out.
>> >>>
>> >>> 2) `process` method adding the StoreBuilders to the topology
>> >>> If the default implementation returned `null` and `XSupplier extends
>> >>> StateStoreConnector`, then existing code would break, because
>> >>> `StreamsBuilder#addStateStore()` would throw a NPE.
>> >>>
>> >>> +1 for opening a WIP PR
>> >>>
>> >>> Best,
>> >>> Bruno
>> >>>
>> >>>
>> >>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <
>> matth...@confluent.io>
>> >>> wrote:
>> >>>
>> >>>> Thank Paul!
>> >>>>
>> >>>> I agree with all of that. If we think that the general design is
>> good,
>> >>>> refactoring a PR if we want to pick a different name should not be
>> too
>> >>>> much additional work (hopefully). Thus, if you want to open a WIP PR
>> >> and
>> >>>> we use it to nail the open details, it might help to find a good
>> >>>> conclusion.
>> >>>>
>> >>>>>> 2) Default method vs new interface:
>> >>>>
>> >>>> This seems to be the hardest tradeoff. I see the point about
>> >>>> discoveability... Might be good to get input from others, which
>> version
>> >>>> they would prefer.
>> >>>>
>> >>>> Just to make clear, my suggestion from the last email was, that
>> >>>> `Transformer` etc does not extend the new interface. Instead, a user
>> >>>> that want to use this feature would need to implement both
>> interfaces.
>> >>>>
>> >>>> If `Transformer extends StoreProvider` (just picking a name here)
>> >>>> without default implementation existing code would break and thus it
>> >> not
>> >>>> a an option because of breaking backward compatibility.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 4/28/19 8:37 PM, Paul Whalen wrote:
>> >>>>> Great thoughts Matthias, thanks! I think we're all agreed that
>> naming
>> >>> and
>> >>>>> documentation/education are the biggest hurdles for this KIP, and in
>> >>>> light
>> >>>>> of that, I think it makes sense for me to just take a stab at a full
>> >>>>> fledged PR with documentation to convince us that it's possible to
>> do
>> >>> it
>> >>>>> with enough clarity.
>> >>>>>
>> >>>>> In response to your specific thoughts:
>> >>>>>
>> >>>>> 1) StateStoreConnector as a name: Really good point about defining
>> >> the
>> >>>>> difference between "adding" and "connecting."  Guozhang suggested
>> >>>>> StateStoreConnector which was definitely an improvement over my
>> >>>>> StateStoresSupplier, but I think you're right that we need to be
>> >>> careful
>> >>>> to
>> >>>>> make it clear that it's really accomplishing both.  Thinking about
>> it
>> >>>> now,
>> >>>>> one problem with Connector is that the implementer of the interface
>> >> is
>> >>>> not
>> >>>>> really doing any connecting, it's providing/supplying the store that
>> >>> will
>> >>>>> be both added and connected.  StoreProvider seems reasonable to me
>> >> and
>> >>>>> probably the best candidate at the moment, but it would be nice if
>> >> the
>> >>>> name
>> >>>>> could convey that it's providing the store specifically so the
>> caller
>> >>> can
>> >>>>> add it to the topology and connect it to the associated transformer.
>> >>>>>
>> >>>>> In general I think that really calling out what "adding" versus
>> >>>>> "connecting" is in the documentation will help make the entire
>> >> purpose
>> >>> of
>> >>>>> this feature more clear to the user.
>> >>>>>
>> >>>>> 2) Default method vs new interface: The choice of a default method
>> >> was
>> >>>>> influenced by Guozhang's fear about API bloat/discoverability.  I
>> can
>> >>>>> definitely see it both ways   Would the separate interface be a
>> >>>>> sub-interface of Processor/TransformerSupplier or standalone?  It
>> >> seems
>> >>>>> like you're suggesting standalone and I think that's what I favor.
>> >> My
>> >>>> only
>> >>>>> concern there is that the interface wouldn't actually be a type to
>> >> any
>> >>>>> public API which sort of hurts discoverability.  You would have to
>> >> read
>> >>>> the
>> >>>>> javadocs for stream.process/transform() to discover that
>> implementing
>> >>> the
>> >>>>> interface in addition to Processor/TransformerSupplier would add and
>> >>>>> connect the store for you.  But that added burden actually probably
>> >>> helps
>> >>>>> us in terms of making sure people don't mix and match, like you
>> said.
>> >>>>>
>> >>>>> 3) Returning null instead of empty: Seems fair to me.  I always
>> worry
>> >>>> about
>> >>>>> returning null when an empty collection can be used instead, but
>> >> given
>> >>>> that
>> >>>>> the library is the caller rather than the client I think your point
>> >>> makes
>> >>>>> sense here.
>> >>>>>
>> >>>>> 4) Returning Set instead of Collection: Agreed, don't see why not to
>> >>> make
>> >>>>> it more specific.
>> >>>>>
>> >>>>> Paul
>> >>>>>
>> >>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <
>> >> matth...@confluent.io
>> >>>>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi, sorry for the long pause. Just trying to catch up here.
>> >>>>>>
>> >>>>>> I think it save to allow `addStateStore()` to be idempotent for the
>> >>> same
>> >>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus
>> >>> it's
>> >>>>>> not really possible to use the same `StoreBuilder` object to create
>> >>>>>> different stores.
>> >>>>>>
>> >>>>>> I also agree with the concern, that only allowing a single store
>> (as
>> >>>>>> proposed by Ivan) might be too restrictive.
>> >>>>>>
>> >>>>>> Overall, the current KIP version LGTM. I don't have mayor concerns
>> >>> about
>> >>>>>> user education for this case, but I agree that we need to document
>> >>> this
>> >>>>>> clearly.
>> >>>>>>
>> >>>>>> Some further comments:
>> >>>>>>
>> >>>>>> (1) I am not sure if `StateStoreConnector` is the best name for the
>> >>> new
>> >>>>>> interface. Note, that there are two concepts about stores:
>> >>>>>>
>> >>>>>>  - adding a store: this makes the store available in the topology
>> in
>> >>>>>> general (however, the store is still "dangling", and not used)
>> >>>>>>  - connecting a store: this allows a processor etc to use a store
>> >>>>>>
>> >>>>>> The new interface does both, but its name only indicates that
>> second
>> >>>>>> part what might be confusing. It might be especially confusing
>> >> because
>> >>>>>> we want to disallow to mix the exiting "manually add and connect"
>> >>>>>> pattern, with a new pattern to "auto add+connect". If the new
>> >>> interface
>> >>>>>> name indicates the connect part only, user might think they need to
>> >>> add
>> >>>>>> stores manually and can connect automatically.
>> >>>>>>
>> >>>>>> Unfortunately, I don't have a much better suggestion for a name
>> >>> either.
>> >>>>>> The only idea that came to my mind was `StoreProvider`: to me, a
>> >>>>>> provider is a "service" interface that does work for us, ie, it
>> adds
>> >>> and
>> >>>>>> connects a store. Not sure if this is too subtle, if we consider
>> >> that
>> >>>>>> there is already the `StoreSupplier` interface?
>> >>>>>>
>> >>>>>> But maybe somebody else might still have a good idea on how the
>> >>> improve
>> >>>>>> the name.
>> >>>>>>
>> >>>>>> In any case, I would suggest to shorten the name to
>> `StoreConnector`
>> >>>>>> instead of `StateStoreConnector`, because we also have
>> >> `StoreSupplier`
>> >>>>>> and `StoreBuilder`.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> (2) The KIP proposes to add the new interface to
>> `ProcessorSupplier`
>> >>> etc
>> >>>>>> and to add a default implementation for the new method. Hence, user
>> >>>>>> would need to overwrite this default implementation to op-in to the
>> >>>>>> feature. I am wonder if it might be better to not add the new
>> >>> interface
>> >>>>>> to `ProcessorSupplier` etc and to just provide a new interface with
>> >> no
>> >>>>>> default implementation. Users would opt-in by adding the interface
>> >>>>>> explicitly to their existing `ProcessorSupplier` implementation.
>> >>>>>> Overwriting a default method and getting different behavior seems
>> to
>> >>> be
>> >>>>>> a little subtle to me, especially, because we don't want to allow
>> to
>> >>>>>> mix-and-match the old and new approaches. Think: I only overwrite a
>> >>>>>> default method and my code breaks.
>> >>>>>>
>> >>>>>> Thoughts?
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> (3) If we keep the current default implementation for the new
>> >> method,
>> >>> I
>> >>>>>> am wondering if it should return `null` instead of an empty
>> >>> collection?
>> >>>>>> This might be saver to detect bugs in user code for which, per
>> >>> accident,
>> >>>>>> an empty collection could be returned.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> (4) Should the new method return a `Set` instead of a `Collection`
>> >> to
>> >>>>>> indicate the semantics clearly (ie, returning the same
>> >> `StoreBuilder`
>> >>>>>> multiple times is idempotent and one cannot add+connect to it
>> >> twice).
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote:
>> >>>>>>> Ivan and Guozhang,
>> >>>>>>>
>> >>>>>>> Thanks for the thoughts!  Ivan's use case is definitely
>> >> interesting.
>> >>>> The
>> >>>>>>> way I see it, if we can achieve the main goal of the KIP (allowing
>> >>>>>>> Processor/TransformerSuppliers to encapsulate their usage of state
>> >>>>>> stores),
>> >>>>>>> we will enable this kind of thing in "user space" very easily.
>> >>>>>>>
>> >>>>>>> I will say that I'm not totally sure that most use cases of
>> >>> transform()
>> >>>>>> use
>> >>>>>>> just one state store.  It's hard to know since I haven't seen many
>> >>>>>> examples
>> >>>>>>> in public, but my team's usages almost exclusively require
>> multiple
>> >>>> state
>> >>>>>>> stores.  We only reach for the low level processor API when we
>> need
>> >>>> that
>> >>>>>>> complexity, and it's somewhat hard to imagine many use cases that
>> >>> only
>> >>>>>> need
>> >>>>>>> one state store, since the high level DSL can usually accomplish
>> >>> those
>> >>>>>>> tasks.  The example Ivan presented for instance looks like a
>> >>>>>>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what
>> >>> sort
>> >>>> of
>> >>>>>>> other usages you're imagining.
>> >>>>>>>
>> >>>>>>> That being said, perhaps the Processor API should really just be
>> >>>>>> considered
>> >>>>>>> a separate paradigm in Streams, not just a lower level that we
>> >> reach
>> >>> to
>> >>>>>>> when necessary.  In which case it would be beneficial to make the
>> >>>> simple
>> >>>>>>> use cases easier.  I've definitely talked about this with my own
>> >>> team -
>> >>>>>> if
>> >>>>>>> you're less familiar with the kind of functional style that the
>> >> high
>> >>>>>> level
>> >>>>>>> DSL offers, it might be easier to "see" your state and interact
>> >> with
>> >>> it
>> >>>>>>> directly.
>> >>>>>>>
>> >>>>>>> Anyway, I've updated the KIP to reflect my current PR with
>> >> Guozhang's
>> >>>>>>> suggestions.  It seems like there is at least some interest in
>> that
>> >>> on
>> >>>>>> its
>> >>>>>>> own and not a ton of pushback, so I think I will try to start a
>> >> vote.
>> >>>>>>>
>> >>>>>>> Paul
>> >>>>>>>
>> >>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <
>> >> iponoma...@mail.ru>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi all!
>> >>>>>>>>
>> >>>>>>>> I was about to write another KIP, but found out that KIP-401
>> >>> addresses
>> >>>>>>>> exactly the problem I faced. So let me jump into your discussion
>> >> and
>> >>>> ask
>> >>>>>>>> you to assess another idea.
>> >>>>>>>>
>> >>>>>>>> I fully agree with the KIP-401's motivation part. E. g in my
>> >>> project I
>> >>>>>> had
>> >>>>>>>> to invent a wrapper class that hides the details of KeyValueStore
>> >>>>>>>> management from business logic. Of course this should be done
>> >> better
>> >>>> in
>> >>>>>>>> KStreams API.
>> >>>>>>>>
>> >>>>>>>> But I was about to look at this problem from another side and
>> >>> propose
>> >>>> a
>> >>>>>>>> simple alternative in high-level DSL, that will not fit all the
>> >>> cases,
>> >>>>>> but
>> >>>>>>>> most of them. Hence my idea does not exclude the Paul's proposal.
>> >>>>>>>>
>> >>>>>>>> What if we restrict ourselves to *only one* KeyValueStore and
>> >>> propose
>> >>>> a
>> >>>>>>>> method that resembles  `aggregate` and `reduce` methods, like
>> >> this:
>> >>>>>>>>
>> >>>>>>>> stream
>> >>>>>>>>    .map(...)
>> >>>>>>>>    .filter(...)
>> >>>>>>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
>> >>>>>>>>
>> >>>>>>>> where
>> >>>>>>>> * k, v -- input key & value
>> >>>>>>>> * s -- a KeyValueStore provided as an argument
>> >>>>>>>> * return value of the lambda should be KeyValue.pair(...)
>> >>>>>>>> * Transformed.with... is a builder, used in order to define the
>> >>>>>>>> Transformer and KeyValueStore building parameters. Some of these
>> >>>>>> parameters
>> >>>>>>>> should be:
>> >>>>>>>> ** store's KeySerde,
>> >>>>>>>> ** store's ValueSerde,
>> >>>>>>>> ** whether the store is persistent or in-memory,
>> >>>>>>>> ** store's name -- optional parameter, the system should be able
>> >> to
>> >>>>>> devise
>> >>>>>>>> the name of the store transparently for the user, if we don't
>> want
>> >>> to
>> >>>>>>>> devise it ourselves/share the store between processors.
>> >>>>>>>> ** scheduled punctuation.
>> >>>>>>>>
>> >>>>>>>> Imagine we have a KStream<String, Integer>, and we need to
>> >>> calculate a
>> >>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the
>> provided
>> >>>>>> integer
>> >>>>>>>> values.
>> >>>>>>>>
>> >>>>>>>> This could be achieved as simple as
>> >>>>>>>>
>> >>>>>>>> stream.transform((key, value, stateStore) -> {
>> >>>>>>>>         int previousValue =
>> >>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
>> >>>>>>>>         stateStore.put(key, value);
>> >>>>>>>>         return KeyValue.pair(key, value - previousValue);
>> >>>>>>>>         }
>> >>>>>>>>         //we do not need to bother with store name, punctuation
>> >> etc.
>> >>>>>>>>         //may be even Serde part can be omitted, since we can
>> >>> inherit
>> >>>>>> the
>> >>>>>>>> serdes from stream by default
>> >>>>>>>>         , Transformed.with(Serdes.String(), Serdes.Integer())
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> The hard part of it is that new `transform` method definition
>> >> should
>> >>>> be
>> >>>>>>>> parameterized by six type parameters:
>> >>>>>>>>
>> >>>>>>>> * input/output/KeyValueStore key type,
>> >>>>>>>> * input/output/KeyValueStore value type.
>> >>>>>>>>
>> >>>>>>>> However, it seems that all these types can be inferred from the
>> >>>> provided
>> >>>>>>>> lambda and Transformed.with instances.
>> >>>>>>>>
>> >>>>>>>> What do you think about this?
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>>
>> >>>>>>>> Ivan
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет:
>> >>>>>>>>
>> >>>>>>>> Hello Paul,
>> >>>>>>>>
>> >>>>>>>> Thanks for the uploaded PR and the detailed description! I've
>> >> made a
>> >>>>>> pass
>> >>>>>>>> on it and left some comments.
>> >>>>>>>>
>> >>>>>>>> Overall I think I agree with you that passing in the storebuilder
>> >>>>>> directly
>> >>>>>>>> that store name is more convienent as it does not require another
>> >>>>>>>> `addStore` call, but we just need to spend some more
>> documentation
>> >>>>>> effort
>> >>>>>>>> on educating users about the two ways of connecting their stores.
>> >>> I'm
>> >>>>>>>> slightly concerned about this education curve but I can be
>> >> convinced
>> >>>> if
>> >>>>>>>> most people felt it is worthy.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com>
>> >> <
>> >>>>>> pgwha...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> I'd like to resurrect this discussion with a cursory,
>> >>> proof-of-concept
>> >>>>>>>> implementation of the KIP which combines many of our ideas:
>> >>>>>> https://github.com/apache/kafka/pull/6496.  I tried to keep the
>> >> diff
>> >>> as
>> >>>>>>>> small as possible for now, just using it to convey the main
>> ideas.
>> >>>> But
>> >>>>>>>> I'll separately address some of our earlier discussion:
>> >>>>>>>>
>> >>>>>>>>    - Will there be a new, separate interface for users to
>> >> implement
>> >>>> for
>> >>>>>> the
>> >>>>>>>>    new functionality? No, to hopefully keep things simple, all of
>> >>> the
>> >>>>>>>>    Processor/TransformerSupplier interfaces will just extend
>> >>>>>>>>    StateStoresSupplier, allowing users to opt in to this
>> >>> functionality
>> >>>>>> by
>> >>>>>>>>    overriding the default implementation that gives an empty
>> list.
>> >>>>>>>>    - Will the interface allow users to specify the store name, or
>> >>> the
>> >>>>>>>>    entire StoreBuilder? The entire StoreBuilder, so the
>> >>>>>>>>    Processor/TransformerSupplier can completely encapsulate name
>> >> and
>> >>>>>>>>    implementation of a state store if desired.
>> >>>>>>>>    - Will the old way of specifying store names alongside the
>> >>> supplier
>> >>>>>> when
>> >>>>>>>>    calling stream.process/transform() be deprecated? No, this is
>> >>>> still a
>> >>>>>>>>    legitimate way to wire up Processors/Transformers and their
>> >>> stores.
>> >>>>>> But
>> >>>>>>>> I
>> >>>>>>>>    would recommend not allowing stream.process/transform() calls
>> >>> that
>> >>>>>> use
>> >>>>>>>> both
>> >>>>>>>>    store declaration mechanisms (this restriction is not in the
>> >>> proof
>> >>>> of
>> >>>>>>>>    concept)
>> >>>>>>>>    - How will we handle adding the same state store to the
>> >> topology
>> >>>>>>>>    multiple times because different
>> Processor/TransformerSuppliers
>> >>>>>> declare
>> >>>>>>>> it?
>> >>>>>>>>    topology.addStateStore() will be slightly relaxed for
>> >>> convenience,
>> >>>>>> and
>> >>>>>>>> will
>> >>>>>>>>    allow adding the same StoreBuilder multiple times as long as
>> >> the
>> >>>>>> exact
>> >>>>>>>> same
>> >>>>>>>>    StoreBuilder instance is being added for the same store name.
>> >>> This
>> >>>>>>>> seems
>> >>>>>>>>    to prevent in practice the issue of accidentally making two
>> >> state
>> >>>>>> stores
>> >>>>>>>>    one by adding with the same name.  For additional safety, if
>> we
>> >>>>>> wanted
>> >>>>>>>> to
>> >>>>>>>>    (not in the proof of concept), we could allow for this
>> >> relaxation
>> >>>>>> only
>> >>>>>>>> for
>> >>>>>>>>    internal callers of topology.addStateStore().
>> >>>>>>>>
>> >>>>>>>> So, in summary, the use cases look like:
>> >>>>>>>>
>> >>>>>>>>    - 1 transformer/processor that owns its store: Using the new
>> >>>>>>>>    StateStoresSupplier interface method to supply its
>> >> StoreBuilders
>> >>>> that
>> >>>>>>>> will
>> >>>>>>>>    be added to the topology automatically.
>> >>>>>>>>    - Multiple transformer/processors that share the same store:
>> >>> Either
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>    1. The old way: the StoreBuilder is defined "far away" from
>> the
>> >>>>>>>>    Transformer/Processor implementations, and is added to the
>> >>> topology
>> >>>>>>>>    manually by the user
>> >>>>>>>>    2. The new way: the StoreBuilder is defined closer to the
>> >>>>>>>>    Transformer/Processor implementations, and the same instance
>> is
>> >>>>>>>> returned by
>> >>>>>>>>    all Transformer/ProcessorSuppliers that need it
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> This makes the KIP wiki a bit stale; I'll update if we want to
>> >> bring
>> >>>>>> this
>> >>>>>>>> design to a vote.
>> >>>>>>>>
>> >>>>>>>> Thanks!
>> >>>>>>>> Paul
>> >>>>>>>>
>> >>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <
>> wangg...@gmail.com
>> >>>
>> >>> <
>> >>>>>> wangg...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Matthias / Paul,
>> >>>>>>>>
>> >>>>>>>> The concern I had about introducing `StoreBuilderSupplier` is
>> >> simply
>> >>>>>>>> because it is another XXSupplier to the public API, so I'd like
>> to
>> >>> ask
>> >>>>>> if
>> >>>>>>>> we really have to add it :)
>> >>>>>>>>
>> >>>>>>>> The difference between encapsulating the store name and
>> >>> encapsulating
>> >>>>>> the
>> >>>>>>>> full state store builder is that, in the former:
>> >>>>>>>>
>> >>>>>>>> -----------
>> >>>>>>>>
>> >>>>>>>> String storeName = "store1";
>> >>>>>>>> builder.addStore(new MyStoreBuilder(storeName));
>> >>>>>>>> stream1.transform(new MyTransformerSupplier(storeName));   //
>> >>>> following
>> >>>>>>>>
>> >>>>>>>> my
>> >>>>>>>>
>> >>>>>>>> proposal, that the store name can be passed in and used for both
>> >>>>>>>> `listStores` and in the `Transformer#init`; so the Transformer
>> >>>> function
>> >>>>>>>> does not need to get the constant string name again.
>> >>>>>>>>
>> >>>>>>>>                          // one caveat to admit, is that
>> >>>>>>>> MyTransofmerSupplier logic may be just unique to `store1` so it
>> >>> cannot
>> >>>>>> be
>> >>>>>>>> reused with a different store name anyways.
>> >>>>>>>> -----------
>> >>>>>>>>
>> >>>>>>>> While in the latter:
>> >>>>>>>>
>> >>>>>>>> -----------
>> >>>>>>>>
>> >>>>>>>> stream1.transform(new MyTransformerSupplierForStore1);   // the
>> >> name
>> >>>> is
>> >>>>>>>> just indicating that we may have one such supplier for each
>> store.
>> >>>>>>>>
>> >>>>>>>> -----------
>> >>>>>>>>
>> >>>>>>>> I understand the latter introduce more convenience from the API,
>> >> but
>> >>>> the
>> >>>>>>>> cost is that since we still cannot completely `builder.addStore`,
>> >>> but
>> >>>>>>>>
>> >>>>>>>> only
>> >>>>>>>>
>> >>>>>>>> reduce its semantic scope to shared state stores only,; hence
>> >> users
>> >>>> need
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> learn two ways of creating state stores for those two patterns.
>> >>>>>>>>
>> >>>>>>>> My argument is that more public APIs requires longer learning
>> >> curve
>> >>>> for
>> >>>>>>>> users, and introduces more usage patterns that may confuse users
>> >>> (the
>> >>>>>>>> proposal I had tries to replace one with another completely).
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com>
>> >> <
>> >>>>>> pgwha...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Thanks for the great thoughts Matthias and Guozhang!
>> >>>>>>>>
>> >>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my second
>> >>>>>>>>
>> >>>>>>>> alternative
>> >>>>>>>>
>> >>>>>>>> on
>> >>>>>>>>
>> >>>>>>>> the KIP is ("Have the added method on the Supplier interfaces
>> only
>> >>>>>>>>
>> >>>>>>>> return
>> >>>>>>>>
>> >>>>>>>> store names, not builders").  I do think it would be a worthwhile
>> >>>>>>>>
>> >>>>>>>> usability
>> >>>>>>>>
>> >>>>>>>> improvement on its own, but to Matthias's point, it doesn't
>> >> achieve
>> >>>> the
>> >>>>>>>> full goal of completing encapsulating a state store and it's
>> >>> processor
>> >>>>>>>>
>> >>>>>>>> -
>> >>>>>>>>
>> >>>>>>>> it
>> >>>>>>>>
>> >>>>>>>> encapsulates the name, but not the StateStoreBuilder.
>> >>>>>>>>
>> >>>>>>>> I'm really intrigued by Matthias's idea that forgoes the default
>> >>>>>>>>
>> >>>>>>>> interface
>> >>>>>>>>
>> >>>>>>>> method I proposed.  Having smaller, separate interfaces is a
>> >>> powerful
>> >>>>>>>>
>> >>>>>>>> idea
>> >>>>>>>>
>> >>>>>>>> and I think a cleaner API than what I proposed.  The non-shared
>> >>> store
>> >>>>>>>>
>> >>>>>>>> use
>> >>>>>>>>
>> >>>>>>>> case is handled well here, and the shared store use case is
>> >>> possible,
>> >>>>>>>> though maybe still not as graceful as we would like (having to
>> add
>> >>> the
>> >>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too
>> >>>>>>>>
>> >>>>>>>> subtle
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> me).
>> >>>>>>>>
>> >>>>>>>> We're all agreed that one of the big problems with the shared
>> >> store
>> >>>> use
>> >>>>>>>> case is how to deal with adding the same store to the topology
>> >>>> multiple
>> >>>>>>>> times.  Catching the "store already added" exception is risky.
>> >>> Here's
>> >>>>>>>>
>> >>>>>>>> a
>> >>>>>>>>
>> >>>>>>>> maybe radical idea: change `topology.addStateStore()` to be
>> >>> idempotent
>> >>>>>>>>
>> >>>>>>>> for
>> >>>>>>>>
>> >>>>>>>> adding a given state store name and `StoreBuilder`.  In other
>> >> words,
>> >>>>>>>> `addStateStore` would not throw the "store already added"
>> >> exception
>> >>> if
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> `StoreBuilder` being added for a given name has the same identity
>> >> as
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> one that has already been added.  Does this eliminate all the
>> bugs
>> >>>>>>>>
>> >>>>>>>> we're
>> >>>>>>>>
>> >>>>>>>> worried about?  Thinking about it for a few minutes, it seems to
>> >>>>>>>>
>> >>>>>>>> eliminate
>> >>>>>>>>
>> >>>>>>>> most at least (would a user really use the exact same
>> StoreBuilder
>> >>>> when
>> >>>>>>>> they intend there to be two stores?).  It might make the API
>> >>> slightly
>> >>>>>>>> harder to use if a user isn't immediately aware of that subtlety,
>> >>> but
>> >>>> a
>> >>>>>>>> good error message should ease the pain, and it would happen
>> >>>>>>>>
>> >>>>>>>> immediately
>> >>>>>>>>
>> >>>>>>>> during development.
>> >>>>>>>>
>> >>>>>>>> And with regards to Matthias's comment about whether we need to
>> >>>>>>>>
>> >>>>>>>> deprecate
>> >>>>>>>>
>> >>>>>>>> existing varargs transform methods - I don't think we need to,
>> but
>> >>> it
>> >>>>>>>>
>> >>>>>>>> might
>> >>>>>>>>
>> >>>>>>>> be nice for there only to be one way to do things, assuming
>> >> whatever
>> >>>> we
>> >>>>>>>> come up with supports all existing use cases.  I don't feel
>> >> strongly
>> >>>>>>>>
>> >>>>>>>> about
>> >>>>>>>>
>> >>>>>>>> this, but if we don't deprecate, I do think it's important to add
>> >>>>>>>>
>> >>>>>>>> checks
>> >>>>>>>>
>> >>>>>>>> that prevent users from trying to do the same thing in two
>> >> different
>> >>>>>>>>
>> >>>>>>>> ways,
>> >>>>>>>>
>> >>>>>>>> as we've discussed.
>> >>>>>>>>
>> >>>>>>>> Paul
>> >>>>>>>>
>> >>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <
>> >>>> matth...@confluent.io
>> >>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang,
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Regarding the last option to catch "store exist already"
>> exception
>> >>>>>>>>
>> >>>>>>>> and
>> >>>>>>>>
>> >>>>>>>> fallback to connect stores, I'm a bit concerned it may be hiding
>> >>>>>>>>
>> >>>>>>>> actual
>> >>>>>>>>
>> >>>>>>>> user bugs.
>> >>>>>>>>
>> >>>>>>>> I agree with this concern. From my original email:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> The only disadvantage I see, might be
>> >>>>>>>> potential bugs about sharing state if two different stores are
>> >>>>>>>>
>> >>>>>>>> named
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> same by mistake (this would not be detected).
>> >>>>>>>>
>> >>>>>>>> For your new proposal: I am not sure if it addresses Paul's
>> >> original
>> >>>>>>>> idea -- I hope Paul can clarify. From my understanding, the idea
>> >> was
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> encapsulate a store and its processor. As many stores are not
>> >>> shared,
>> >>>>>>>> this seems to be quite useful. Your proposal falls a little short
>> >> to
>> >>>>>>>> support encapsulation for none-shared stores.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
>> >>>>>>>>
>> >>>>>>>> Matthias,
>> >>>>>>>>
>> >>>>>>>> Thanks for your feedbacks.
>> >>>>>>>>
>> >>>>>>>> Regarding the last option to catch "store exist already"
>> exception
>> >>>>>>>>
>> >>>>>>>> and
>> >>>>>>>>
>> >>>>>>>> fallback to connect stores, I'm a bit concerned it may be hiding
>> >>>>>>>>
>> >>>>>>>> actual
>> >>>>>>>>
>> >>>>>>>> user bugs.
>> >>>>>>>>
>> >>>>>>>> Thinking about Paul's proposal and your suggestion again, I'd
>> like
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> propose another alternative somewhere in the middle of your
>> >>>>>>>>
>> >>>>>>>> approaches,
>> >>>>>>>>
>> >>>>>>>> i.e. we still let users to create sharable state stores via
>> >>>>>>>> `addStateStore`, and we allow the TransformerSupplier to return a
>> >>>>>>>>
>> >>>>>>>> list
>> >>>>>>>>
>> >>>>>>>> of
>> >>>>>>>>
>> >>>>>>>> state stores that it needs, i.e.:
>> >>>>>>>>
>> >>>>>>>> public interface TransformerSupplier<K, V, R> {
>> >>>>>>>>     Transformer<K, V, R> get();
>> >>>>>>>>     default List<String> stateStoreNames() {
>> >>>>>>>>         return Collections.emptyList();
>> >>>>>>>> <
>> >> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>> >>>>>>>>
>> >>>>>>>> ;>
>> >>>>>>>>
>> >>>>>>>>     }
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> by doing this users can still "consolidate" the references of
>> >> store
>> >>>>>>>>
>> >>>>>>>> names
>> >>>>>>>>
>> >>>>>>>> in a single place in the transform call, e.g.:
>> >>>>>>>>
>> >>>>>>>> public class MyTransformerSupplier<K, V, R> {
>> >>>>>>>>     private String storeName;
>> >>>>>>>>
>> >>>>>>>>     public class MyTransformer<K, V, R> {
>> >>>>>>>>
>> >>>>>>>>        ....
>> >>>>>>>>
>> >>>>>>>>        init() {
>> >>>>>>>>           store = context.getStateStore(storeName);
>> >>>>>>>>        }
>> >>>>>>>>     }
>> >>>>>>>>
>> >>>>>>>>     default List<String> stateStoreNames() {
>> >>>>>>>>         return Collections.singletonList(storeName);
>> >>>>>>>> <
>> >> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>> >>>>>>>>
>> >>>>>>>> ;>
>> >>>>>>>>
>> >>>>>>>>     }
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> Basically, we move the parameters from the caller of `transform`
>> >> to
>> >>>>>>>>
>> >>>>>>>> inside
>> >>>>>>>>
>> >>>>>>>> the TransformSuppliers. DSL implementations would not change
>> much,
>> >>>>>>>>
>> >>>>>>>> simply
>> >>>>>>>>
>> >>>>>>>> calling `connectStateStore` by getting the list of names from the
>> >>>>>>>>
>> >>>>>>>> provided
>> >>>>>>>>
>> >>>>>>>> function.
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
>> >>>>>>>>
>> >>>>>>>> matth...@confluent.io
>> >>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Just a meta comment: do we really need to deprecate existing
>> >>>>>>>> `transform()` etc methods?
>> >>>>>>>>
>> >>>>>>>> The last argument is a vararg, and thus, just keeping the
>> existing
>> >>>>>>>>
>> >>>>>>>> API
>> >>>>>>>>
>> >>>>>>>> for this part seems to work too, allowing to implement both
>> >>>>>>>>
>> >>>>>>>> patterns?
>> >>>>>>>>
>> >>>>>>>> Also, instead of adding a default method, we could also add a new
>> >>>>>>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
>> >>>>>>>> stateStores()` -- users could implement `TransformerSupplier` and
>> >>>>>>>> `StoreBuilderSupplier` at once; and for this case, we require
>> that
>> >>>>>>>>
>> >>>>>>>> users
>> >>>>>>>>
>> >>>>>>>> don't provide store name in `transform()`.
>> >>>>>>>>
>> >>>>>>>> Similar, we could add an interface `StoreNameSupplier` with
>> method
>> >>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a
>> >>>>>>>>
>> >>>>>>>> transformer
>> >>>>>>>>
>> >>>>>>>> to existing stores (to avoid the issue to add the same store
>> >>>>>>>>
>> >>>>>>>> multiple
>> >>>>>>>>
>> >>>>>>>> times).
>> >>>>>>>>
>> >>>>>>>> Hence, for shared stores, there would be one "main" transformer
>> >>>>>>>>
>> >>>>>>>> that
>> >>>>>>>>
>> >>>>>>>> implements `StoreBuilderSupplier` and that must be added first to
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> topology. The other transformers would implement
>> >>>>>>>>
>> >>>>>>>> `StoreNameSupplier`
>> >>>>>>>>
>> >>>>>>>> and
>> >>>>>>>>
>> >>>>>>>> just connect to those stores.
>> >>>>>>>>
>> >>>>>>>> Another possibility to avoid the issue of adding the same stores
>> >>>>>>>> multiple times would be, that the DSL always calls
>> >>>>>>>>
>> >>>>>>>> `addStateStore()`
>> >>>>>>>>
>> >>>>>>>> but
>> >>>>>>>>
>> >>>>>>>> catches a potential "store exists already" exception and falls
>> >>>>>>>>
>> >>>>>>>> back
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we would
>> >>>>>>>>
>> >>>>>>>> not
>> >>>>>>>>
>> >>>>>>>> need
>> >>>>>>>>
>> >>>>>>>> the `StoreNameSupplier` interface and the order in which
>> >>>>>>>>
>> >>>>>>>> transformers
>> >>>>>>>>
>> >>>>>>>> are added would not matter either. The only disadvantage I see,
>> >>>>>>>>
>> >>>>>>>> might
>> >>>>>>>>
>> >>>>>>>> be
>> >>>>>>>>
>> >>>>>>>> potential bugs about sharing state if two different stores are
>> >>>>>>>>
>> >>>>>>>> named
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> same by mistake (this would not be detected).
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Just some ideas I wanted to share. What do you think?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>> >>>>>>>>
>> >>>>>>>> Ah yes of course, this was an oversight, I completely ignored the
>> >>>>>>>>
>> >>>>>>>> multiple
>> >>>>>>>>
>> >>>>>>>> processors sharing the same state store when writing up the KIP.
>> >>>>>>>>
>> >>>>>>>> Which
>> >>>>>>>>
>> >>>>>>>> is
>> >>>>>>>>
>> >>>>>>>> funny, because I've actually done this (different processors
>> >>>>>>>>
>> >>>>>>>> sharing
>> >>>>>>>>
>> >>>>>>>> state
>> >>>>>>>>
>> >>>>>>>> stores) a fair amount myself, and I've settled on a pattern
>> >>>>>>>>
>> >>>>>>>> where I
>> >>>>>>>>
>> >>>>>>>> group
>> >>>>>>>>
>> >>>>>>>> the Processors in an enclosing class, and that enclosing class
>> >>>>>>>>
>> >>>>>>>> handles
>> >>>>>>>>
>> >>>>>>>> as
>> >>>>>>>>
>> >>>>>>>> much as possible.  Here's a gist showing the rough structure,
>> >>>>>>>>
>> >>>>>>>> just
>> >>>>>>>>
>> >>>>>>>> for
>> >>>>>>>>
>> >>>>>>>> context:
>> >>>>>>>>
>> >>>>>>>>
>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>> >>>>>>>>
>> >>>>>>>> . Note how it adds the stores to the topology, as well as
>> >>>>>>>>
>> >>>>>>>> providing a
>> >>>>>>>>
>> >>>>>>>> public method with the store names.
>> >>>>>>>>
>> >>>>>>>> I don't think my proposal completely conflicts with the multiple
>> >>>>>>>>
>> >>>>>>>> processors
>> >>>>>>>>
>> >>>>>>>> sharing state stores use case, since you can create a supplier
>> >>>>>>>>
>> >>>>>>>> that
>> >>>>>>>>
>> >>>>>>>> provides the store name you want, somewhat independently of your
>> >>>>>>>>
>> >>>>>>>> actual
>> >>>>>>>>
>> >>>>>>>> Processor logic.  The issue I do see though, is that
>> >>>>>>>> topology.addStateStore() can only be called once for a given
>> >>>>>>>>
>> >>>>>>>> store.
>> >>>>>>>>
>> >>>>>>>> So
>> >>>>>>>>
>> >>>>>>>> for
>> >>>>>>>>
>> >>>>>>>> your example, if the there was a single TransformerSupplier that
>> >>>>>>>>
>> >>>>>>>> was
>> >>>>>>>>
>> >>>>>>>> passed
>> >>>>>>>>
>> >>>>>>>> into both transform() calls, "store1" would be added (under the
>> >>>>>>>>
>> >>>>>>>> hood)
>> >>>>>>>>
>> >>>>>>>> to
>> >>>>>>>>
>> >>>>>>>> the topology twice, which is no good.
>> >>>>>>>>
>> >>>>>>>> Perhaps this suggests that one of my alternatives on the KIP
>> >>>>>>>>
>> >>>>>>>> might
>> >>>>>>>>
>> >>>>>>>> be
>> >>>>>>>>
>> >>>>>>>> desirable: either not having the suppliers return StoreBuilders
>> >>>>>>>>
>> >>>>>>>> (just
>> >>>>>>>>
>> >>>>>>>> store
>> >>>>>>>>
>> >>>>>>>> names), or not deprecating the old methods that take "String...
>> >>>>>>>> stateStoreNames". I'll have to think about it a bit.
>> >>>>>>>>
>> >>>>>>>> Paul
>> >>>>>>>>
>> >>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
>> >>>>>>>>
>> >>>>>>>> wangg...@gmail.com>
>> >>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>> Hello Paul,
>> >>>>>>>>
>> >>>>>>>> Thanks for the great writeup (very detailed and crystal
>> >>>>>>>>
>> >>>>>>>> motivation
>> >>>>>>>>
>> >>>>>>>> sections!).
>> >>>>>>>>
>> >>>>>>>> This is quite an interesting idea and I do like the API
>> >>>>>>>>
>> >>>>>>>> cleanness
>> >>>>>>>>
>> >>>>>>>> you
>> >>>>>>>>
>> >>>>>>>> proposed. The original motivation of letting StreamsTopology to
>> >>>>>>>>
>> >>>>>>>> add
>> >>>>>>>>
>> >>>>>>>> state
>> >>>>>>>>
>> >>>>>>>> stores though, is to allow different processors to share the
>> >>>>>>>>
>> >>>>>>>> state
>> >>>>>>>>
>> >>>>>>>> store.
>> >>>>>>>>
>> >>>>>>>> For example:
>> >>>>>>>>
>> >>>>>>>> builder.addStore("store1");
>> >>>>>>>>
>> >>>>>>>> // a path of stream transformations that leads to KStream
>> >>>>>>>>
>> >>>>>>>> stream1.
>> >>>>>>>>
>> >>>>>>>> stream1.transform(..., "store1");
>> >>>>>>>>
>> >>>>>>>> // another path that generates a KStream stream2.
>> >>>>>>>> stream2.transform(..., "store1");
>> >>>>>>>>
>> >>>>>>>> Behind the scene, Streams will make sure stream1 / stream2
>> >>>>>>>>
>> >>>>>>>> transformations
>> >>>>>>>>
>> >>>>>>>> will always be grouped together as a single group of tasks, each
>> >>>>>>>>
>> >>>>>>>> of
>> >>>>>>>>
>> >>>>>>>> which
>> >>>>>>>>
>> >>>>>>>> will be executed by a single thread and hence there's no
>> >>>>>>>>
>> >>>>>>>> concurrency
>> >>>>>>>>
>> >>>>>>>> issues
>> >>>>>>>>
>> >>>>>>>> on accessing the store from different operators within the same
>> >>>>>>>>
>> >>>>>>>> task.
>> >>>>>>>>
>> >>>>>>>> I'm
>> >>>>>>>>
>> >>>>>>>> not sure how common this use case is, but I'd like to hear if
>> >>>>>>>>
>> >>>>>>>> you
>> >>>>>>>>
>> >>>>>>>> have
>> >>>>>>>>
>> >>>>>>>> any
>> >>>>>>>>
>> >>>>>>>> thoughts maintaining this since the current proposal seems
>> >>>>>>>>
>> >>>>>>>> exclude
>> >>>>>>>>
>> >>>>>>>> this
>> >>>>>>>>
>> >>>>>>>> possibility.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com>
>> <
>> >>>>>> pgwha...@gmail.com>
>> >>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change
>> >>>>>>>>
>> >>>>>>>> that
>> >>>>>>>>
>> >>>>>>>> I
>> >>>>>>>>
>> >>>>>>>> think could greatly increase the usability of the low-level
>> >>>>>>>>
>> >>>>>>>> processor
>> >>>>>>>>
>> >>>>>>>> API.
>> >>>>>>>>
>> >>>>>>>> I have some code written but will wait to see if there is buy
>> >>>>>>>>
>> >>>>>>>> in
>> >>>>>>>>
>> >>>>>>>> before
>> >>>>>>>>
>> >>>>>>>> going all out and creating a pull request.  It seems like most
>> >>>>>>>>
>> >>>>>>>> of
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>
>> >>>>>>>> work
>> >>>>>>>>
>> >>>>>>>> would be in updating documentation and tests.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>> >>>>>>>>
>> >>>>>>>> Thanks!
>> >>>>>>>> Paul
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to