Hi Paul,

I will try to express myself a bit clearer.

Ad 1)
My assumption is that if `StateStoreConnector#stateStores()` returns `null`
Kafka Streams will throw an NPE because on purpose no null check is
performed before the loop that calls `StreamsBuilder#addStateStore()`. When
the user finally understands the cause of the NPE, she knows that she has
to override `StateStoreConnector#stateStores()` in her implementation. My
question was, why let the user discover that she has to overwrite the
method at runtime if you could not provide a default implementation for
`StateStoreConnector#stateStores()` and let the compiler tell the user the
need to overwrite the method. Not providing a default implementation
without separating the interfaces implies not being backward-compatible.
That means, if we choose to not provide a default implementation and let
the compiler signal the necessity to override the method, we have to
separate the interfaces in any case.

Ad 2)
If you check for `null` or empty list in `process` and do not call
`addStateStores` in those cases, the advantage of returning `null` to be
saver to detect bugs as mentioned by Matthias would be lost. But maybe I am
missing something here.

Best,
Bruno



On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> wrote:

> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
>
> 1) I'm not totally sure I'm clear on your point, but I think we're on the
> same page - if we're adding a method to the XSupplier interfaces (by making
> them inherit from a super interface StateStoreConnector) then we definitely
> need a default implementation to maintain compatibility.  Whether the
> default implementation returns null or an empty list is somewhat of a
> detail.
>
> 2) If stream.process() sees that StateStoreConnector#stateStores() returns
> either null or an empty list, it would handle that case specifically and
> not try to call addStateStore at all.  Or is this not what you're asking?
>
> Separately, I'm still hacking away at the details of the PR and will
> continue to get something into a discussable state, but I'll share some
> thoughts I've run into.
>
> A) I'm tentatively going the separate interface route (Matthias's
> suggestion) and naming it ConnectedStoreProvider.  Still don't love the
> name, but there's something nice about the name indicating *why* this thing
> is providing the store, not just that it is providing it.
>
> B) It has occurred to me that topology.addProcessor() could also recognize
> if ProcessorSupplier implements ConnectedStoreProvider and add and connect
> stores appropriately.  This isn't in the KIP and I think the value-add is
> lower (if you're reaching that low level, surely the "auto add/connect
> store" isn't too important to you), but I think it would be a confusing if
> it didn't, and I don't see any real downside.
>
> Paul
>
> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi,
> >
> > @Paul: Thank you for the KIP!
> >
> > I hope you do not mind that I jump in.
> >
> > I have the following comments:
> >
> > 1) `null` vs empty list in the default implementation
> > IIUC, returning `null` in the default implementation should basically
> > signal that the method `stateStores` was not overridden. Why then
> provide a
> > default implementation in the first place? Without default implementation
> > you would discover the missing implementation already at compile-time and
> > not only at runtime. If you decide not to provide a default
> implementation,
> > `XSupplier extends StateStoreConnector` would break existing code as
> > Matthias has already pointed out.
> >
> > 2) `process` method adding the StoreBuilders to the topology
> > If the default implementation returned `null` and `XSupplier extends
> > StateStoreConnector`, then existing code would break, because
> > `StreamsBuilder#addStateStore()` would throw a NPE.
> >
> > +1 for opening a WIP PR
> >
> > Best,
> > Bruno
> >
> >
> > On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thank Paul!
> > >
> > > I agree with all of that. If we think that the general design is good,
> > > refactoring a PR if we want to pick a different name should not be too
> > > much additional work (hopefully). Thus, if you want to open a WIP PR
> and
> > > we use it to nail the open details, it might help to find a good
> > > conclusion.
> > >
> > > >> 2) Default method vs new interface:
> > >
> > > This seems to be the hardest tradeoff. I see the point about
> > > discoveability... Might be good to get input from others, which version
> > > they would prefer.
> > >
> > > Just to make clear, my suggestion from the last email was, that
> > > `Transformer` etc does not extend the new interface. Instead, a user
> > > that want to use this feature would need to implement both interfaces.
> > >
> > > If `Transformer extends StoreProvider` (just picking a name here)
> > > without default implementation existing code would break and thus it
> not
> > > a an option because of breaking backward compatibility.
> > >
> > >
> > > -Matthias
> > >
> > > On 4/28/19 8:37 PM, Paul Whalen wrote:
> > > > Great thoughts Matthias, thanks! I think we're all agreed that naming
> > and
> > > > documentation/education are the biggest hurdles for this KIP, and in
> > > light
> > > > of that, I think it makes sense for me to just take a stab at a full
> > > > fledged PR with documentation to convince us that it's possible to do
> > it
> > > > with enough clarity.
> > > >
> > > > In response to your specific thoughts:
> > > >
> > > > 1) StateStoreConnector as a name: Really good point about defining
> the
> > > > difference between "adding" and "connecting."  Guozhang suggested
> > > > StateStoreConnector which was definitely an improvement over my
> > > > StateStoresSupplier, but I think you're right that we need to be
> > careful
> > > to
> > > > make it clear that it's really accomplishing both.  Thinking about it
> > > now,
> > > > one problem with Connector is that the implementer of the interface
> is
> > > not
> > > > really doing any connecting, it's providing/supplying the store that
> > will
> > > > be both added and connected.  StoreProvider seems reasonable to me
> and
> > > > probably the best candidate at the moment, but it would be nice if
> the
> > > name
> > > > could convey that it's providing the store specifically so the caller
> > can
> > > > add it to the topology and connect it to the associated transformer.
> > > >
> > > > In general I think that really calling out what "adding" versus
> > > > "connecting" is in the documentation will help make the entire
> purpose
> > of
> > > > this feature more clear to the user.
> > > >
> > > > 2) Default method vs new interface: The choice of a default method
> was
> > > > influenced by Guozhang's fear about API bloat/discoverability.  I can
> > > > definitely see it both ways   Would the separate interface be a
> > > > sub-interface of Processor/TransformerSupplier or standalone?  It
> seems
> > > > like you're suggesting standalone and I think that's what I favor.
> My
> > > only
> > > > concern there is that the interface wouldn't actually be a type to
> any
> > > > public API which sort of hurts discoverability.  You would have to
> read
> > > the
> > > > javadocs for stream.process/transform() to discover that implementing
> > the
> > > > interface in addition to Processor/TransformerSupplier would add and
> > > > connect the store for you.  But that added burden actually probably
> > helps
> > > > us in terms of making sure people don't mix and match, like you said.
> > > >
> > > > 3) Returning null instead of empty: Seems fair to me.  I always worry
> > > about
> > > > returning null when an empty collection can be used instead, but
> given
> > > that
> > > > the library is the caller rather than the client I think your point
> > makes
> > > > sense here.
> > > >
> > > > 4) Returning Set instead of Collection: Agreed, don't see why not to
> > make
> > > > it more specific.
> > > >
> > > > Paul
> > > >
> > > > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Hi, sorry for the long pause. Just trying to catch up here.
> > > >>
> > > >> I think it save to allow `addStateStore()` to be idempotent for the
> > same
> > > >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus
> > it's
> > > >> not really possible to use the same `StoreBuilder` object to create
> > > >> different stores.
> > > >>
> > > >> I also agree with the concern, that only allowing a single store (as
> > > >> proposed by Ivan) might be too restrictive.
> > > >>
> > > >> Overall, the current KIP version LGTM. I don't have mayor concerns
> > about
> > > >> user education for this case, but I agree that we need to document
> > this
> > > >> clearly.
> > > >>
> > > >> Some further comments:
> > > >>
> > > >> (1) I am not sure if `StateStoreConnector` is the best name for the
> > new
> > > >> interface. Note, that there are two concepts about stores:
> > > >>
> > > >>  - adding a store: this makes the store available in the topology in
> > > >> general (however, the store is still "dangling", and not used)
> > > >>  - connecting a store: this allows a processor etc to use a store
> > > >>
> > > >> The new interface does both, but its name only indicates that second
> > > >> part what might be confusing. It might be especially confusing
> because
> > > >> we want to disallow to mix the exiting "manually add and connect"
> > > >> pattern, with a new pattern to "auto add+connect". If the new
> > interface
> > > >> name indicates the connect part only, user might think they need to
> > add
> > > >> stores manually and can connect automatically.
> > > >>
> > > >> Unfortunately, I don't have a much better suggestion for a name
> > either.
> > > >> The only idea that came to my mind was `StoreProvider`: to me, a
> > > >> provider is a "service" interface that does work for us, ie, it adds
> > and
> > > >> connects a store. Not sure if this is too subtle, if we consider
> that
> > > >> there is already the `StoreSupplier` interface?
> > > >>
> > > >> But maybe somebody else might still have a good idea on how the
> > improve
> > > >> the name.
> > > >>
> > > >> In any case, I would suggest to shorten the name to `StoreConnector`
> > > >> instead of `StateStoreConnector`, because we also have
> `StoreSupplier`
> > > >> and `StoreBuilder`.
> > > >>
> > > >>
> > > >>
> > > >> (2) The KIP proposes to add the new interface to `ProcessorSupplier`
> > etc
> > > >> and to add a default implementation for the new method. Hence, user
> > > >> would need to overwrite this default implementation to op-in to the
> > > >> feature. I am wonder if it might be better to not add the new
> > interface
> > > >> to `ProcessorSupplier` etc and to just provide a new interface with
> no
> > > >> default implementation. Users would opt-in by adding the interface
> > > >> explicitly to their existing `ProcessorSupplier` implementation.
> > > >> Overwriting a default method and getting different behavior seems to
> > be
> > > >> a little subtle to me, especially, because we don't want to allow to
> > > >> mix-and-match the old and new approaches. Think: I only overwrite a
> > > >> default method and my code breaks.
> > > >>
> > > >> Thoughts?
> > > >>
> > > >>
> > > >>
> > > >> (3) If we keep the current default implementation for the new
> method,
> > I
> > > >> am wondering if it should return `null` instead of an empty
> > collection?
> > > >> This might be saver to detect bugs in user code for which, per
> > accident,
> > > >> an empty collection could be returned.
> > > >>
> > > >>
> > > >>
> > > >> (4) Should the new method return a `Set` instead of a `Collection`
> to
> > > >> indicate the semantics clearly (ie, returning the same
> `StoreBuilder`
> > > >> multiple times is idempotent and one cannot add+connect to it
> twice).
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On 4/6/19 12:27 PM, Paul Whalen wrote:
> > > >>> Ivan and Guozhang,
> > > >>>
> > > >>> Thanks for the thoughts!  Ivan's use case is definitely
> interesting.
> > > The
> > > >>> way I see it, if we can achieve the main goal of the KIP (allowing
> > > >>> Processor/TransformerSuppliers to encapsulate their usage of state
> > > >> stores),
> > > >>> we will enable this kind of thing in "user space" very easily.
> > > >>>
> > > >>> I will say that I'm not totally sure that most use cases of
> > transform()
> > > >> use
> > > >>> just one state store.  It's hard to know since I haven't seen many
> > > >> examples
> > > >>> in public, but my team's usages almost exclusively require multiple
> > > state
> > > >>> stores.  We only reach for the low level processor API when we need
> > > that
> > > >>> complexity, and it's somewhat hard to imagine many use cases that
> > only
> > > >> need
> > > >>> one state store, since the high level DSL can usually accomplish
> > those
> > > >>> tasks.  The example Ivan presented for instance looks like a
> > > >>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what
> > sort
> > > of
> > > >>> other usages you're imagining.
> > > >>>
> > > >>> That being said, perhaps the Processor API should really just be
> > > >> considered
> > > >>> a separate paradigm in Streams, not just a lower level that we
> reach
> > to
> > > >>> when necessary.  In which case it would be beneficial to make the
> > > simple
> > > >>> use cases easier.  I've definitely talked about this with my own
> > team -
> > > >> if
> > > >>> you're less familiar with the kind of functional style that the
> high
> > > >> level
> > > >>> DSL offers, it might be easier to "see" your state and interact
> with
> > it
> > > >>> directly.
> > > >>>
> > > >>> Anyway, I've updated the KIP to reflect my current PR with
> Guozhang's
> > > >>> suggestions.  It seems like there is at least some interest in that
> > on
> > > >> its
> > > >>> own and not a ton of pushback, so I think I will try to start a
> vote.
> > > >>>
> > > >>> Paul
> > > >>>
> > > >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <
> iponoma...@mail.ru>
> > > >> wrote:
> > > >>>
> > > >>>> Hi all!
> > > >>>>
> > > >>>> I was about to write another KIP, but found out that KIP-401
> > addresses
> > > >>>> exactly the problem I faced. So let me jump into your discussion
> and
> > > ask
> > > >>>> you to assess another idea.
> > > >>>>
> > > >>>> I fully agree with the KIP-401's motivation part. E. g in my
> > project I
> > > >> had
> > > >>>> to invent a wrapper class that hides the details of KeyValueStore
> > > >>>> management from business logic. Of course this should be done
> better
> > > in
> > > >>>> KStreams API.
> > > >>>>
> > > >>>> But I was about to look at this problem from another side and
> > propose
> > > a
> > > >>>> simple alternative in high-level DSL, that will not fit all the
> > cases,
> > > >> but
> > > >>>> most of them. Hence my idea does not exclude the Paul's proposal.
> > > >>>>
> > > >>>> What if we restrict ourselves to *only one* KeyValueStore and
> > propose
> > > a
> > > >>>> method that resembles  `aggregate` and `reduce` methods, like
> this:
> > > >>>>
> > > >>>> stream
> > > >>>>    .map(...)
> > > >>>>    .filter(...)
> > > >>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
> > > >>>>
> > > >>>> where
> > > >>>> * k, v -- input key & value
> > > >>>> * s -- a KeyValueStore provided as an argument
> > > >>>> * return value of the lambda should be KeyValue.pair(...)
> > > >>>> * Transformed.with... is a builder, used in order to define the
> > > >>>> Transformer and KeyValueStore building parameters. Some of these
> > > >> parameters
> > > >>>> should be:
> > > >>>> ** store's KeySerde,
> > > >>>> ** store's ValueSerde,
> > > >>>> ** whether the store is persistent or in-memory,
> > > >>>> ** store's name -- optional parameter, the system should be able
> to
> > > >> devise
> > > >>>> the name of the store transparently for the user, if we don't want
> > to
> > > >>>> devise it ourselves/share the store between processors.
> > > >>>> ** scheduled punctuation.
> > > >>>>
> > > >>>> Imagine we have a KStream<String, Integer>, and we need to
> > calculate a
> > > >>>> `derivative` stream, that is, a stream of 'deltas' of the provided
> > > >> integer
> > > >>>> values.
> > > >>>>
> > > >>>> This could be achieved as simple as
> > > >>>>
> > > >>>> stream.transform((key, value, stateStore) -> {
> > > >>>>         int previousValue =
> > > >>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
> > > >>>>         stateStore.put(key, value);
> > > >>>>         return KeyValue.pair(key, value - previousValue);
> > > >>>>         }
> > > >>>>         //we do not need to bother with store name, punctuation
> etc.
> > > >>>>         //may be even Serde part can be omitted, since we can
> > inherit
> > > >> the
> > > >>>> serdes from stream by default
> > > >>>>         , Transformed.with(Serdes.String(), Serdes.Integer())
> > > >>>> }
> > > >>>>
> > > >>>> The hard part of it is that new `transform` method definition
> should
> > > be
> > > >>>> parameterized by six type parameters:
> > > >>>>
> > > >>>> * input/output/KeyValueStore key type,
> > > >>>> * input/output/KeyValueStore value type.
> > > >>>>
> > > >>>> However, it seems that all these types can be inferred from the
> > > provided
> > > >>>> lambda and Transformed.with instances.
> > > >>>>
> > > >>>> What do you think about this?
> > > >>>>
> > > >>>> Regards,
> > > >>>>
> > > >>>> Ivan
> > > >>>>
> > > >>>>
> > > >>>> 27.03.2019 20:45, Guozhang Wang пишет:
> > > >>>>
> > > >>>> Hello Paul,
> > > >>>>
> > > >>>> Thanks for the uploaded PR and the detailed description! I've
> made a
> > > >> pass
> > > >>>> on it and left some comments.
> > > >>>>
> > > >>>> Overall I think I agree with you that passing in the storebuilder
> > > >> directly
> > > >>>> that store name is more convienent as it does not require another
> > > >>>> `addStore` call, but we just need to spend some more documentation
> > > >> effort
> > > >>>> on educating users about the two ways of connecting their stores.
> > I'm
> > > >>>> slightly concerned about this education curve but I can be
> convinced
> > > if
> > > >>>> most people felt it is worthy.
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com>
> <
> > > >> pgwha...@gmail.com> wrote:
> > > >>>>
> > > >>>>
> > > >>>> I'd like to resurrect this discussion with a cursory,
> > proof-of-concept
> > > >>>> implementation of the KIP which combines many of our ideas:
> > > >> https://github.com/apache/kafka/pull/6496.  I tried to keep the
> diff
> > as
> > > >>>> small as possible for now, just using it to convey the main ideas.
> > > But
> > > >>>> I'll separately address some of our earlier discussion:
> > > >>>>
> > > >>>>    - Will there be a new, separate interface for users to
> implement
> > > for
> > > >> the
> > > >>>>    new functionality? No, to hopefully keep things simple, all of
> > the
> > > >>>>    Processor/TransformerSupplier interfaces will just extend
> > > >>>>    StateStoresSupplier, allowing users to opt in to this
> > functionality
> > > >> by
> > > >>>>    overriding the default implementation that gives an empty list.
> > > >>>>    - Will the interface allow users to specify the store name, or
> > the
> > > >>>>    entire StoreBuilder? The entire StoreBuilder, so the
> > > >>>>    Processor/TransformerSupplier can completely encapsulate name
> and
> > > >>>>    implementation of a state store if desired.
> > > >>>>    - Will the old way of specifying store names alongside the
> > supplier
> > > >> when
> > > >>>>    calling stream.process/transform() be deprecated? No, this is
> > > still a
> > > >>>>    legitimate way to wire up Processors/Transformers and their
> > stores.
> > > >> But
> > > >>>> I
> > > >>>>    would recommend not allowing stream.process/transform() calls
> > that
> > > >> use
> > > >>>> both
> > > >>>>    store declaration mechanisms (this restriction is not in the
> > proof
> > > of
> > > >>>>    concept)
> > > >>>>    - How will we handle adding the same state store to the
> topology
> > > >>>>    multiple times because different Processor/TransformerSuppliers
> > > >> declare
> > > >>>> it?
> > > >>>>    topology.addStateStore() will be slightly relaxed for
> > convenience,
> > > >> and
> > > >>>> will
> > > >>>>    allow adding the same StoreBuilder multiple times as long as
> the
> > > >> exact
> > > >>>> same
> > > >>>>    StoreBuilder instance is being added for the same store name.
> > This
> > > >>>> seems
> > > >>>>    to prevent in practice the issue of accidentally making two
> state
> > > >> stores
> > > >>>>    one by adding with the same name.  For additional safety, if we
> > > >> wanted
> > > >>>> to
> > > >>>>    (not in the proof of concept), we could allow for this
> relaxation
> > > >> only
> > > >>>> for
> > > >>>>    internal callers of topology.addStateStore().
> > > >>>>
> > > >>>> So, in summary, the use cases look like:
> > > >>>>
> > > >>>>    - 1 transformer/processor that owns its store: Using the new
> > > >>>>    StateStoresSupplier interface method to supply its
> StoreBuilders
> > > that
> > > >>>> will
> > > >>>>    be added to the topology automatically.
> > > >>>>    - Multiple transformer/processors that share the same store:
> > Either
> > > >>>>
> > > >>>>
> > > >>>>    1. The old way: the StoreBuilder is defined "far away" from the
> > > >>>>    Transformer/Processor implementations, and is added to the
> > topology
> > > >>>>    manually by the user
> > > >>>>    2. The new way: the StoreBuilder is defined closer to the
> > > >>>>    Transformer/Processor implementations, and the same instance is
> > > >>>> returned by
> > > >>>>    all Transformer/ProcessorSuppliers that need it
> > > >>>>
> > > >>>>
> > > >>>> This makes the KIP wiki a bit stale; I'll update if we want to
> bring
> > > >> this
> > > >>>> design to a vote.
> > > >>>>
> > > >>>> Thanks!
> > > >>>> Paul
> > > >>>>
> > > >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com
> >
> > <
> > > >> wangg...@gmail.com> wrote:
> > > >>>>
> > > >>>>
> > > >>>> Matthias / Paul,
> > > >>>>
> > > >>>> The concern I had about introducing `StoreBuilderSupplier` is
> simply
> > > >>>> because it is another XXSupplier to the public API, so I'd like to
> > ask
> > > >> if
> > > >>>> we really have to add it :)
> > > >>>>
> > > >>>> The difference between encapsulating the store name and
> > encapsulating
> > > >> the
> > > >>>> full state store builder is that, in the former:
> > > >>>>
> > > >>>> -----------
> > > >>>>
> > > >>>> String storeName = "store1";
> > > >>>> builder.addStore(new MyStoreBuilder(storeName));
> > > >>>> stream1.transform(new MyTransformerSupplier(storeName));   //
> > > following
> > > >>>>
> > > >>>> my
> > > >>>>
> > > >>>> proposal, that the store name can be passed in and used for both
> > > >>>> `listStores` and in the `Transformer#init`; so the Transformer
> > > function
> > > >>>> does not need to get the constant string name again.
> > > >>>>
> > > >>>>                          // one caveat to admit, is that
> > > >>>> MyTransofmerSupplier logic may be just unique to `store1` so it
> > cannot
> > > >> be
> > > >>>> reused with a different store name anyways.
> > > >>>> -----------
> > > >>>>
> > > >>>> While in the latter:
> > > >>>>
> > > >>>> -----------
> > > >>>>
> > > >>>> stream1.transform(new MyTransformerSupplierForStore1);   // the
> name
> > > is
> > > >>>> just indicating that we may have one such supplier for each store.
> > > >>>>
> > > >>>> -----------
> > > >>>>
> > > >>>> I understand the latter introduce more convenience from the API,
> but
> > > the
> > > >>>> cost is that since we still cannot completely `builder.addStore`,
> > but
> > > >>>>
> > > >>>> only
> > > >>>>
> > > >>>> reduce its semantic scope to shared state stores only,; hence
> users
> > > need
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> learn two ways of creating state stores for those two patterns.
> > > >>>>
> > > >>>> My argument is that more public APIs requires longer learning
> curve
> > > for
> > > >>>> users, and introduces more usage patterns that may confuse users
> > (the
> > > >>>> proposal I had tries to replace one with another completely).
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com>
> <
> > > >> pgwha...@gmail.com> wrote:
> > > >>>>
> > > >>>>
> > > >>>> Thanks for the great thoughts Matthias and Guozhang!
> > > >>>>
> > > >>>> If I'm not mistaken, Guozhang's suggestion is what my second
> > > >>>>
> > > >>>> alternative
> > > >>>>
> > > >>>> on
> > > >>>>
> > > >>>> the KIP is ("Have the added method on the Supplier interfaces only
> > > >>>>
> > > >>>> return
> > > >>>>
> > > >>>> store names, not builders").  I do think it would be a worthwhile
> > > >>>>
> > > >>>> usability
> > > >>>>
> > > >>>> improvement on its own, but to Matthias's point, it doesn't
> achieve
> > > the
> > > >>>> full goal of completing encapsulating a state store and it's
> > processor
> > > >>>>
> > > >>>> -
> > > >>>>
> > > >>>> it
> > > >>>>
> > > >>>> encapsulates the name, but not the StateStoreBuilder.
> > > >>>>
> > > >>>> I'm really intrigued by Matthias's idea that forgoes the default
> > > >>>>
> > > >>>> interface
> > > >>>>
> > > >>>> method I proposed.  Having smaller, separate interfaces is a
> > powerful
> > > >>>>
> > > >>>> idea
> > > >>>>
> > > >>>> and I think a cleaner API than what I proposed.  The non-shared
> > store
> > > >>>>
> > > >>>> use
> > > >>>>
> > > >>>> case is handled well here, and the shared store use case is
> > possible,
> > > >>>> though maybe still not as graceful as we would like (having to add
> > the
> > > >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too
> > > >>>>
> > > >>>> subtle
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> me).
> > > >>>>
> > > >>>> We're all agreed that one of the big problems with the shared
> store
> > > use
> > > >>>> case is how to deal with adding the same store to the topology
> > > multiple
> > > >>>> times.  Catching the "store already added" exception is risky.
> > Here's
> > > >>>>
> > > >>>> a
> > > >>>>
> > > >>>> maybe radical idea: change `topology.addStateStore()` to be
> > idempotent
> > > >>>>
> > > >>>> for
> > > >>>>
> > > >>>> adding a given state store name and `StoreBuilder`.  In other
> words,
> > > >>>> `addStateStore` would not throw the "store already added"
> exception
> > if
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> `StoreBuilder` being added for a given name has the same identity
> as
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> one that has already been added.  Does this eliminate all the bugs
> > > >>>>
> > > >>>> we're
> > > >>>>
> > > >>>> worried about?  Thinking about it for a few minutes, it seems to
> > > >>>>
> > > >>>> eliminate
> > > >>>>
> > > >>>> most at least (would a user really use the exact same StoreBuilder
> > > when
> > > >>>> they intend there to be two stores?).  It might make the API
> > slightly
> > > >>>> harder to use if a user isn't immediately aware of that subtlety,
> > but
> > > a
> > > >>>> good error message should ease the pain, and it would happen
> > > >>>>
> > > >>>> immediately
> > > >>>>
> > > >>>> during development.
> > > >>>>
> > > >>>> And with regards to Matthias's comment about whether we need to
> > > >>>>
> > > >>>> deprecate
> > > >>>>
> > > >>>> existing varargs transform methods - I don't think we need to, but
> > it
> > > >>>>
> > > >>>> might
> > > >>>>
> > > >>>> be nice for there only to be one way to do things, assuming
> whatever
> > > we
> > > >>>> come up with supports all existing use cases.  I don't feel
> strongly
> > > >>>>
> > > >>>> about
> > > >>>>
> > > >>>> this, but if we don't deprecate, I do think it's important to add
> > > >>>>
> > > >>>> checks
> > > >>>>
> > > >>>> that prevent users from trying to do the same thing in two
> different
> > > >>>>
> > > >>>> ways,
> > > >>>>
> > > >>>> as we've discussed.
> > > >>>>
> > > >>>> Paul
> > > >>>>
> > > >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <
> > > matth...@confluent.io
> > > >>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>
> > > >>>> Guozhang,
> > > >>>>
> > > >>>>
> > > >>>> Regarding the last option to catch "store exist already" exception
> > > >>>>
> > > >>>> and
> > > >>>>
> > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding
> > > >>>>
> > > >>>> actual
> > > >>>>
> > > >>>> user bugs.
> > > >>>>
> > > >>>> I agree with this concern. From my original email:
> > > >>>>
> > > >>>>
> > > >>>> The only disadvantage I see, might be
> > > >>>> potential bugs about sharing state if two different stores are
> > > >>>>
> > > >>>> named
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> same by mistake (this would not be detected).
> > > >>>>
> > > >>>> For your new proposal: I am not sure if it addresses Paul's
> original
> > > >>>> idea -- I hope Paul can clarify. From my understanding, the idea
> was
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> encapsulate a store and its processor. As many stores are not
> > shared,
> > > >>>> this seems to be quite useful. Your proposal falls a little short
> to
> > > >>>> support encapsulation for none-shared stores.
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
> > > >>>>
> > > >>>> Matthias,
> > > >>>>
> > > >>>> Thanks for your feedbacks.
> > > >>>>
> > > >>>> Regarding the last option to catch "store exist already" exception
> > > >>>>
> > > >>>> and
> > > >>>>
> > > >>>> fallback to connect stores, I'm a bit concerned it may be hiding
> > > >>>>
> > > >>>> actual
> > > >>>>
> > > >>>> user bugs.
> > > >>>>
> > > >>>> Thinking about Paul's proposal and your suggestion again, I'd like
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> propose another alternative somewhere in the middle of your
> > > >>>>
> > > >>>> approaches,
> > > >>>>
> > > >>>> i.e. we still let users to create sharable state stores via
> > > >>>> `addStateStore`, and we allow the TransformerSupplier to return a
> > > >>>>
> > > >>>> list
> > > >>>>
> > > >>>> of
> > > >>>>
> > > >>>> state stores that it needs, i.e.:
> > > >>>>
> > > >>>> public interface TransformerSupplier<K, V, R> {
> > > >>>>     Transformer<K, V, R> get();
> > > >>>>     default List<String> stateStoreNames() {
> > > >>>>         return Collections.emptyList();
> > > >>>> <
> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> > > >>>>
> > > >>>> ;>
> > > >>>>
> > > >>>>     }
> > > >>>> }
> > > >>>>
> > > >>>> by doing this users can still "consolidate" the references of
> store
> > > >>>>
> > > >>>> names
> > > >>>>
> > > >>>> in a single place in the transform call, e.g.:
> > > >>>>
> > > >>>> public class MyTransformerSupplier<K, V, R> {
> > > >>>>     private String storeName;
> > > >>>>
> > > >>>>     public class MyTransformer<K, V, R> {
> > > >>>>
> > > >>>>        ....
> > > >>>>
> > > >>>>        init() {
> > > >>>>           store = context.getStateStore(storeName);
> > > >>>>        }
> > > >>>>     }
> > > >>>>
> > > >>>>     default List<String> stateStoreNames() {
> > > >>>>         return Collections.singletonList(storeName);
> > > >>>> <
> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> > > >>>>
> > > >>>> ;>
> > > >>>>
> > > >>>>     }
> > > >>>> }
> > > >>>>
> > > >>>> Basically, we move the parameters from the caller of `transform`
> to
> > > >>>>
> > > >>>> inside
> > > >>>>
> > > >>>> the TransformSuppliers. DSL implementations would not change much,
> > > >>>>
> > > >>>> simply
> > > >>>>
> > > >>>> calling `connectStateStore` by getting the list of names from the
> > > >>>>
> > > >>>> provided
> > > >>>>
> > > >>>> function.
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>>
> > > >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
> > > >>>>
> > > >>>> matth...@confluent.io
> > > >>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>
> > > >>>> Just a meta comment: do we really need to deprecate existing
> > > >>>> `transform()` etc methods?
> > > >>>>
> > > >>>> The last argument is a vararg, and thus, just keeping the existing
> > > >>>>
> > > >>>> API
> > > >>>>
> > > >>>> for this part seems to work too, allowing to implement both
> > > >>>>
> > > >>>> patterns?
> > > >>>>
> > > >>>> Also, instead of adding a default method, we could also add a new
> > > >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
> > > >>>> stateStores()` -- users could implement `TransformerSupplier` and
> > > >>>> `StoreBuilderSupplier` at once; and for this case, we require that
> > > >>>>
> > > >>>> users
> > > >>>>
> > > >>>> don't provide store name in `transform()`.
> > > >>>>
> > > >>>> Similar, we could add an interface `StoreNameSupplier` with method
> > > >>>> `List<String> stateStores()`. This allows to "auto-wire" a
> > > >>>>
> > > >>>> transformer
> > > >>>>
> > > >>>> to existing stores (to avoid the issue to add the same store
> > > >>>>
> > > >>>> multiple
> > > >>>>
> > > >>>> times).
> > > >>>>
> > > >>>> Hence, for shared stores, there would be one "main" transformer
> > > >>>>
> > > >>>> that
> > > >>>>
> > > >>>> implements `StoreBuilderSupplier` and that must be added first to
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> topology. The other transformers would implement
> > > >>>>
> > > >>>> `StoreNameSupplier`
> > > >>>>
> > > >>>> and
> > > >>>>
> > > >>>> just connect to those stores.
> > > >>>>
> > > >>>> Another possibility to avoid the issue of adding the same stores
> > > >>>> multiple times would be, that the DSL always calls
> > > >>>>
> > > >>>> `addStateStore()`
> > > >>>>
> > > >>>> but
> > > >>>>
> > > >>>> catches a potential "store exists already" exception and falls
> > > >>>>
> > > >>>> back
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> `connectProcessorAndStateStore()` for this case. Thus, we would
> > > >>>>
> > > >>>> not
> > > >>>>
> > > >>>> need
> > > >>>>
> > > >>>> the `StoreNameSupplier` interface and the order in which
> > > >>>>
> > > >>>> transformers
> > > >>>>
> > > >>>> are added would not matter either. The only disadvantage I see,
> > > >>>>
> > > >>>> might
> > > >>>>
> > > >>>> be
> > > >>>>
> > > >>>> potential bugs about sharing state if two different stores are
> > > >>>>
> > > >>>> named
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> same by mistake (this would not be detected).
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Just some ideas I wanted to share. What do you think?
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
> > > >>>>
> > > >>>> Ah yes of course, this was an oversight, I completely ignored the
> > > >>>>
> > > >>>> multiple
> > > >>>>
> > > >>>> processors sharing the same state store when writing up the KIP.
> > > >>>>
> > > >>>> Which
> > > >>>>
> > > >>>> is
> > > >>>>
> > > >>>> funny, because I've actually done this (different processors
> > > >>>>
> > > >>>> sharing
> > > >>>>
> > > >>>> state
> > > >>>>
> > > >>>> stores) a fair amount myself, and I've settled on a pattern
> > > >>>>
> > > >>>> where I
> > > >>>>
> > > >>>> group
> > > >>>>
> > > >>>> the Processors in an enclosing class, and that enclosing class
> > > >>>>
> > > >>>> handles
> > > >>>>
> > > >>>> as
> > > >>>>
> > > >>>> much as possible.  Here's a gist showing the rough structure,
> > > >>>>
> > > >>>> just
> > > >>>>
> > > >>>> for
> > > >>>>
> > > >>>> context:
> > > >>>>
> > > >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> > > >>>>
> > > >>>> . Note how it adds the stores to the topology, as well as
> > > >>>>
> > > >>>> providing a
> > > >>>>
> > > >>>> public method with the store names.
> > > >>>>
> > > >>>> I don't think my proposal completely conflicts with the multiple
> > > >>>>
> > > >>>> processors
> > > >>>>
> > > >>>> sharing state stores use case, since you can create a supplier
> > > >>>>
> > > >>>> that
> > > >>>>
> > > >>>> provides the store name you want, somewhat independently of your
> > > >>>>
> > > >>>> actual
> > > >>>>
> > > >>>> Processor logic.  The issue I do see though, is that
> > > >>>> topology.addStateStore() can only be called once for a given
> > > >>>>
> > > >>>> store.
> > > >>>>
> > > >>>> So
> > > >>>>
> > > >>>> for
> > > >>>>
> > > >>>> your example, if the there was a single TransformerSupplier that
> > > >>>>
> > > >>>> was
> > > >>>>
> > > >>>> passed
> > > >>>>
> > > >>>> into both transform() calls, "store1" would be added (under the
> > > >>>>
> > > >>>> hood)
> > > >>>>
> > > >>>> to
> > > >>>>
> > > >>>> the topology twice, which is no good.
> > > >>>>
> > > >>>> Perhaps this suggests that one of my alternatives on the KIP
> > > >>>>
> > > >>>> might
> > > >>>>
> > > >>>> be
> > > >>>>
> > > >>>> desirable: either not having the suppliers return StoreBuilders
> > > >>>>
> > > >>>> (just
> > > >>>>
> > > >>>> store
> > > >>>>
> > > >>>> names), or not deprecating the old methods that take "String...
> > > >>>> stateStoreNames". I'll have to think about it a bit.
> > > >>>>
> > > >>>> Paul
> > > >>>>
> > > >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
> > > >>>>
> > > >>>> wangg...@gmail.com>
> > > >>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Hello Paul,
> > > >>>>
> > > >>>> Thanks for the great writeup (very detailed and crystal
> > > >>>>
> > > >>>> motivation
> > > >>>>
> > > >>>> sections!).
> > > >>>>
> > > >>>> This is quite an interesting idea and I do like the API
> > > >>>>
> > > >>>> cleanness
> > > >>>>
> > > >>>> you
> > > >>>>
> > > >>>> proposed. The original motivation of letting StreamsTopology to
> > > >>>>
> > > >>>> add
> > > >>>>
> > > >>>> state
> > > >>>>
> > > >>>> stores though, is to allow different processors to share the
> > > >>>>
> > > >>>> state
> > > >>>>
> > > >>>> store.
> > > >>>>
> > > >>>> For example:
> > > >>>>
> > > >>>> builder.addStore("store1");
> > > >>>>
> > > >>>> // a path of stream transformations that leads to KStream
> > > >>>>
> > > >>>> stream1.
> > > >>>>
> > > >>>> stream1.transform(..., "store1");
> > > >>>>
> > > >>>> // another path that generates a KStream stream2.
> > > >>>> stream2.transform(..., "store1");
> > > >>>>
> > > >>>> Behind the scene, Streams will make sure stream1 / stream2
> > > >>>>
> > > >>>> transformations
> > > >>>>
> > > >>>> will always be grouped together as a single group of tasks, each
> > > >>>>
> > > >>>> of
> > > >>>>
> > > >>>> which
> > > >>>>
> > > >>>> will be executed by a single thread and hence there's no
> > > >>>>
> > > >>>> concurrency
> > > >>>>
> > > >>>> issues
> > > >>>>
> > > >>>> on accessing the store from different operators within the same
> > > >>>>
> > > >>>> task.
> > > >>>>
> > > >>>> I'm
> > > >>>>
> > > >>>> not sure how common this use case is, but I'd like to hear if
> > > >>>>
> > > >>>> you
> > > >>>>
> > > >>>> have
> > > >>>>
> > > >>>> any
> > > >>>>
> > > >>>> thoughts maintaining this since the current proposal seems
> > > >>>>
> > > >>>> exclude
> > > >>>>
> > > >>>> this
> > > >>>>
> > > >>>> possibility.
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>>
> > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> <
> > > >> pgwha...@gmail.com>
> > > >>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change
> > > >>>>
> > > >>>> that
> > > >>>>
> > > >>>> I
> > > >>>>
> > > >>>> think could greatly increase the usability of the low-level
> > > >>>>
> > > >>>> processor
> > > >>>>
> > > >>>> API.
> > > >>>>
> > > >>>> I have some code written but will wait to see if there is buy
> > > >>>>
> > > >>>> in
> > > >>>>
> > > >>>> before
> > > >>>>
> > > >>>> going all out and creating a pull request.  It seems like most
> > > >>>>
> > > >>>> of
> > > >>>>
> > > >>>> the
> > > >>>>
> > > >>>> work
> > > >>>>
> > > >>>> would be in updating documentation and tests.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> > > >>>>
> > > >>>> Thanks!
> > > >>>> Paul
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -- Guozhang
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -- Guozhang
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to