Hello Paul,

Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.

Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.


Guozhang

On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> wrote:

> I'd like to resurrect this discussion with a cursory, proof-of-concept
> implementation of the KIP which combines many of our ideas:
> https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
> small as possible for now, just using it to convey the main ideas.  But
> I'll separately address some of our earlier discussion:
>
>    - Will there be a new, separate interface for users to implement for the
>    new functionality? No, to hopefully keep things simple, all of the
>    Processor/TransformerSupplier interfaces will just extend
>    StateStoresSupplier, allowing users to opt in to this functionality by
>    overriding the default implementation that gives an empty list.
>    - Will the interface allow users to specify the store name, or the
>    entire StoreBuilder? The entire StoreBuilder, so the
>    Processor/TransformerSupplier can completely encapsulate name and
>    implementation of a state store if desired.
>    - Will the old way of specifying store names alongside the supplier when
>    calling stream.process/transform() be deprecated? No, this is still a
>    legitimate way to wire up Processors/Transformers and their stores. But
> I
>    would recommend not allowing stream.process/transform() calls that use
> both
>    store declaration mechanisms (this restriction is not in the proof of
>    concept)
>    - How will we handle adding the same state store to the topology
>    multiple times because different Processor/TransformerSuppliers declare
> it?
>    topology.addStateStore() will be slightly relaxed for convenience, and
> will
>    allow adding the same StoreBuilder multiple times as long as the exact
> same
>    StoreBuilder instance is being added for the same store name.  This
> seems
>    to prevent in practice the issue of accidentally making two state stores
>    one by adding with the same name.  For additional safety, if we wanted
> to
>    (not in the proof of concept), we could allow for this relaxation only
> for
>    internal callers of topology.addStateStore().
>
> So, in summary, the use cases look like:
>
>    - 1 transformer/processor that owns its store: Using the new
>    StateStoresSupplier interface method to supply its StoreBuilders that
> will
>    be added to the topology automatically.
>    - Multiple transformer/processors that share the same store: Either
>
>
>    1. The old way: the StoreBuilder is defined "far away" from the
>    Transformer/Processor implementations, and is added to the topology
>    manually by the user
>    2. The new way: the StoreBuilder is defined closer to the
>    Transformer/Processor implementations, and the same instance is
> returned by
>    all Transformer/ProcessorSuppliers that need it
>
>
> This makes the KIP wiki a bit stale; I'll update if we want to bring this
> design to a vote.
>
> Thanks!
> Paul
>
> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Matthias / Paul,
> >
> > The concern I had about introducing `StoreBuilderSupplier` is simply
> > because it is another XXSupplier to the public API, so I'd like to ask if
> > we really have to add it :)
> >
> > The difference between encapsulating the store name and encapsulating the
> > full state store builder is that, in the former:
> >
> > -----------
> >
> > String storeName = "store1";
> > builder.addStore(new MyStoreBuilder(storeName));
> > stream1.transform(new MyTransformerSupplier(storeName));   // following
> my
> > proposal, that the store name can be passed in and used for both
> > `listStores` and in the `Transformer#init`; so the Transformer function
> > does not need to get the constant string name again.
> >
> >                          // one caveat to admit, is that
> > MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
> > reused with a different store name anyways.
> > -----------
> >
> > While in the latter:
> >
> > -----------
> >
> > stream1.transform(new MyTransformerSupplierForStore1);   // the name is
> > just indicating that we may have one such supplier for each store.
> >
> > -----------
> >
> > I understand the latter introduce more convenience from the API, but the
> > cost is that since we still cannot completely `builder.addStore`, but
> only
> > reduce its semantic scope to shared state stores only,; hence users need
> to
> > learn two ways of creating state stores for those two patterns.
> >
> > My argument is that more public APIs requires longer learning curve for
> > users, and introduces more usage patterns that may confuse users (the
> > proposal I had tries to replace one with another completely).
> >
> >
> > Guozhang
> >
> > On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> wrote:
> >
> > > Thanks for the great thoughts Matthias and Guozhang!
> > >
> > > If I'm not mistaken, Guozhang's suggestion is what my second
> alternative
> > on
> > > the KIP is ("Have the added method on the Supplier interfaces only
> return
> > > store names, not builders").  I do think it would be a worthwhile
> > usability
> > > improvement on its own, but to Matthias's point, it doesn't achieve the
> > > full goal of completing encapsulating a state store and it's processor
> -
> > it
> > > encapsulates the name, but not the StateStoreBuilder.
> > >
> > > I'm really intrigued by Matthias's idea that forgoes the default
> > interface
> > > method I proposed.  Having smaller, separate interfaces is a powerful
> > idea
> > > and I think a cleaner API than what I proposed.  The non-shared store
> use
> > > case is handled well here, and the shared store use case is possible,
> > > though maybe still not as graceful as we would like (having to add the
> > > StoreBuilderSupplier before the StoreNameSupplier seems maybe too
> subtle
> > to
> > > me).
> > >
> > > We're all agreed that one of the big problems with the shared store use
> > > case is how to deal with adding the same store to the topology multiple
> > > times.  Catching the "store already added" exception is risky.  Here's
> a
> > > maybe radical idea: change `topology.addStateStore()` to be idempotent
> > for
> > > adding a given state store name and `StoreBuilder`.  In other words,
> > > `addStateStore` would not throw the "store already added" exception if
> > the
> > > `StoreBuilder` being added for a given name has the same identity as
> the
> > > one that has already been added.  Does this eliminate all the bugs
> we're
> > > worried about?  Thinking about it for a few minutes, it seems to
> > eliminate
> > > most at least (would a user really use the exact same StoreBuilder when
> > > they intend there to be two stores?).  It might make the API slightly
> > > harder to use if a user isn't immediately aware of that subtlety, but a
> > > good error message should ease the pain, and it would happen
> immediately
> > > during development.
> > >
> > > And with regards to Matthias's comment about whether we need to
> deprecate
> > > existing varargs transform methods - I don't think we need to, but it
> > might
> > > be nice for there only to be one way to do things, assuming whatever we
> > > come up with supports all existing use cases.  I don't feel strongly
> > about
> > > this, but if we don't deprecate, I do think it's important to add
> checks
> > > that prevent users from trying to do the same thing in two different
> > ways,
> > > as we've discussed.
> > >
> > > Paul
> > >
> > > On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > Guozhang,
> > > >
> > > > >> Regarding the last option to catch "store exist already" exception
> > and
> > > > >> fallback to connect stores, I'm a bit concerned it may be hiding
> > > actual
> > > > >> user bugs.
> > > >
> > > > I agree with this concern. From my original email:
> > > >
> > > > > The only disadvantage I see, might be
> > > > > potential bugs about sharing state if two different stores are
> named
> > > the
> > > > > same by mistake (this would not be detected).
> > > >
> > > >
> > > > For your new proposal: I am not sure if it addresses Paul's original
> > > > idea -- I hope Paul can clarify. From my understanding, the idea was
> to
> > > > encapsulate a store and its processor. As many stores are not shared,
> > > > this seems to be quite useful. Your proposal falls a little short to
> > > > support encapsulation for none-shared stores.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 12/15/18 1:40 AM, Guozhang Wang wrote:
> > > > > Matthias,
> > > > >
> > > > > Thanks for your feedbacks.
> > > > >
> > > > > Regarding the last option to catch "store exist already" exception
> > and
> > > > > fallback to connect stores, I'm a bit concerned it may be hiding
> > actual
> > > > > user bugs.
> > > > >
> > > > > Thinking about Paul's proposal and your suggestion again, I'd like
> to
> > > > > propose another alternative somewhere in the middle of your
> > approaches,
> > > > > i.e. we still let users to create sharable state stores via
> > > > > `addStateStore`, and we allow the TransformerSupplier to return a
> > list
> > > of
> > > > > state stores that it needs, i.e.:
> > > > >
> > > > > public interface TransformerSupplier<K, V, R> {
> > > > >     Transformer<K, V, R> get();
> > > > >     default List<String> stateStoreNames() {
> > > > >         return Collections.emptyList();
> > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> ;>
> > > > >     }
> > > > > }
> > > > >
> > > > > by doing this users can still "consolidate" the references of store
> > > names
> > > > > in a single place in the transform call, e.g.:
> > > > >
> > > > > public class MyTransformerSupplier<K, V, R> {
> > > > >     private String storeName;
> > > > >
> > > > >     public class MyTransformer<K, V, R> {
> > > > >
> > > > >        ....
> > > > >
> > > > >        init() {
> > > > >           store = context.getStateStore(storeName);
> > > > >        }
> > > > >     }
> > > > >
> > > > >     default List<String> stateStoreNames() {
> > > > >         return Collections.singletonList(storeName);
> > > > > <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> ;>
> > > > >     }
> > > > > }
> > > > >
> > > > > Basically, we move the parameters from the caller of `transform` to
> > > > inside
> > > > > the TransformSuppliers. DSL implementations would not change much,
> > > simply
> > > > > calling `connectStateStore` by getting the list of names from the
> > > > provided
> > > > > function.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > >> Just a meta comment: do we really need to deprecate existing
> > > > >> `transform()` etc methods?
> > > > >>
> > > > >> The last argument is a vararg, and thus, just keeping the existing
> > API
> > > > >> for this part seems to work too, allowing to implement both
> > patterns?
> > > > >>
> > > > >> Also, instead of adding a default method, we could also add a new
> > > > >> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
> > > > >> stateStores()` -- users could implement `TransformerSupplier` and
> > > > >> `StoreBuilderSupplier` at once; and for this case, we require that
> > > users
> > > > >> don't provide store name in `transform()`.
> > > > >>
> > > > >> Similar, we could add an interface `StoreNameSupplier` with method
> > > > >> `List<String> stateStores()`. This allows to "auto-wire" a
> > transformer
> > > > >> to existing stores (to avoid the issue to add the same store
> > multiple
> > > > >> times).
> > > > >>
> > > > >> Hence, for shared stores, there would be one "main" transformer
> that
> > > > >> implements `StoreBuilderSupplier` and that must be added first to
> > the
> > > > >> topology. The other transformers would implement
> `StoreNameSupplier`
> > > and
> > > > >> just connect to those stores.
> > > > >>
> > > > >> Another possibility to avoid the issue of adding the same stores
> > > > >> multiple times would be, that the DSL always calls
> `addStateStore()`
> > > but
> > > > >> catches a potential "store exists already" exception and falls
> back
> > to
> > > > >> `connectProcessorAndStateStore()` for this case. Thus, we would
> not
> > > need
> > > > >> the `StoreNameSupplier` interface and the order in which
> > transformers
> > > > >> are added would not matter either. The only disadvantage I see,
> > might
> > > be
> > > > >> potential bugs about sharing state if two different stores are
> named
> > > the
> > > > >> same by mistake (this would not be detected).
> > > > >>
> > > > >>
> > > > >>
> > > > >> Just some ideas I wanted to share. What do you think?
> > > > >>
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 12/11/18 3:46 AM, Paul Whalen wrote:
> > > > >>> Ah yes of course, this was an oversight, I completely ignored the
> > > > >> multiple
> > > > >>> processors sharing the same state store when writing up the KIP.
> > > Which
> > > > >> is
> > > > >>> funny, because I've actually done this (different processors
> > sharing
> > > > >> state
> > > > >>> stores) a fair amount myself, and I've settled on a pattern
> where I
> > > > group
> > > > >>> the Processors in an enclosing class, and that enclosing class
> > > handles
> > > > as
> > > > >>> much as possible.  Here's a gist showing the rough structure,
> just
> > > for
> > > > >>> context:
> > > > >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> > > > >>> . Note how it adds the stores to the topology, as well as
> > providing a
> > > > >>> public method with the store names.
> > > > >>>
> > > > >>> I don't think my proposal completely conflicts with the multiple
> > > > >> processors
> > > > >>> sharing state stores use case, since you can create a supplier
> that
> > > > >>> provides the store name you want, somewhat independently of your
> > > actual
> > > > >>> Processor logic.  The issue I do see though, is that
> > > > >>> topology.addStateStore() can only be called once for a given
> store.
> > > So
> > > > >> for
> > > > >>> your example, if the there was a single TransformerSupplier that
> > was
> > > > >> passed
> > > > >>> into both transform() calls, "store1" would be added (under the
> > hood)
> > > > to
> > > > >>> the topology twice, which is no good.
> > > > >>>
> > > > >>> Perhaps this suggests that one of my alternatives on the KIP
> might
> > be
> > > > >>> desirable: either not having the suppliers return StoreBuilders
> > (just
> > > > >> store
> > > > >>> names), or not deprecating the old methods that take "String...
> > > > >>> stateStoreNames". I'll have to think about it a bit.
> > > > >>>
> > > > >>> Paul
> > > > >>>
> > > > >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> Hello Paul,
> > > > >>>>
> > > > >>>> Thanks for the great writeup (very detailed and crystal
> motivation
> > > > >>>> sections!).
> > > > >>>>
> > > > >>>> This is quite an interesting idea and I do like the API
> cleanness
> > > you
> > > > >>>> proposed. The original motivation of letting StreamsTopology to
> > add
> > > > >> state
> > > > >>>> stores though, is to allow different processors to share the
> state
> > > > >> store.
> > > > >>>> For example:
> > > > >>>>
> > > > >>>> builder.addStore("store1");
> > > > >>>>
> > > > >>>> // a path of stream transformations that leads to KStream
> stream1.
> > > > >>>> stream1.transform(..., "store1");
> > > > >>>>
> > > > >>>> // another path that generates a KStream stream2.
> > > > >>>> stream2.transform(..., "store1");
> > > > >>>>
> > > > >>>> Behind the scene, Streams will make sure stream1 / stream2
> > > > >> transformations
> > > > >>>> will always be grouped together as a single group of tasks, each
> > of
> > > > >> which
> > > > >>>> will be executed by a single thread and hence there's no
> > concurrency
> > > > >> issues
> > > > >>>> on accessing the store from different operators within the same
> > > task.
> > > > >> I'm
> > > > >>>> not sure how common this use case is, but I'd like to hear if
> you
> > > have
> > > > >> any
> > > > >>>> thoughts maintaining this since the current proposal seems
> exclude
> > > > this
> > > > >>>> possibility.
> > > > >>>>
> > > > >>>>
> > > > >>>> Guozhang
> > > > >>>>
> > > > >>>>
> > > > >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com>
> > > > wrote:
> > > > >>>>
> > > > >>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change
> > > that
> > > > I
> > > > >>>>> think could greatly increase the usability of the low-level
> > > processor
> > > > >>>> API.
> > > > >>>>> I have some code written but will wait to see if there is buy
> in
> > > > before
> > > > >>>>> going all out and creating a pull request.  It seems like most
> of
> > > the
> > > > >>>> work
> > > > >>>>> would be in updating documentation and tests.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> > > > >>>>>
> > > > >>>>> Thanks!
> > > > >>>>> Paul
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> -- Guozhang
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to